Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert add supported task types for agent service by default for task types #4162

Merged
4 changes: 4 additions & 0 deletions flyteplugins/go/tasks/plugins/webapi/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
Insecure: true,
DefaultTimeout: config.Duration{Duration: 10 * time.Second},
},
SupportedTaskTypes: []string{"task_type_1", "task_type_2"},
}

configSection = pluginsConfig.MustRegisterSubSection("agent-service", &defaultConfig)
Expand All @@ -65,6 +66,9 @@ type Config struct {

// Maps task types to their agents. {TaskType: AgentId}
AgentForTaskTypes map[string]string `json:"agentForTaskTypes" pflag:"-,"`

// SupportedTaskTypes is a list of task types that are supported by this plugin.
SupportedTaskTypes []string `json:"supportedTaskTypes" pflag:"-,Defines a list of task types that are supported by this plugin."`
}

type Agent struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestEndToEnd(t *testing.T) {
tr.OnRead(context.Background()).Return(nil, fmt.Errorf("read fail"))
tCtx.OnTaskReader().Return(tr)

agentPlugin := newAgentPlugin(SupportedTaskTypes{})
agentPlugin := newAgentPlugin()
pluginEntry := pluginmachinery.CreateRemotePlugin(agentPlugin)
plugin, err := pluginEntry.LoadPlugin(context.TODO(), newFakeSetupContext("test3"))
assert.NoError(t, err)
Expand Down
12 changes: 4 additions & 8 deletions flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (

type GetClientFunc func(ctx context.Context, agent *Agent, connectionCache map[*Agent]*grpc.ClientConn) (service.AsyncAgentServiceClient, error)

type TaskType = string
type SupportedTaskTypes []TaskType
type Plugin struct {
metricScope promutils.Scope
cfg *Config
Expand Down Expand Up @@ -298,10 +296,8 @@ func getFinalContext(ctx context.Context, operation string, agent *Agent) (conte
return context.WithTimeout(ctx, timeout)
}

func newAgentPlugin(supportedTaskTypes SupportedTaskTypes) webapi.PluginEntry {
if len(supportedTaskTypes) == 0 {
supportedTaskTypes = SupportedTaskTypes{"default_supported_task_type"}
}
func newAgentPlugin() webapi.PluginEntry {
supportedTaskTypes := GetConfig().SupportedTaskTypes

return webapi.PluginEntry{
ID: "agent-service",
Expand All @@ -317,9 +313,9 @@ func newAgentPlugin(supportedTaskTypes SupportedTaskTypes) webapi.PluginEntry {
}
}

func RegisterAgentPlugin(supportedTaskTypes SupportedTaskTypes) {
func RegisterAgentPlugin() {
gob.Register(ResourceMetaWrapper{})
gob.Register(ResourceWrapper{})

pluginmachinery.PluginRegistry().RegisterRemotePlugin(newAgentPlugin(supportedTaskTypes))
pluginmachinery.PluginRegistry().RegisterRemotePlugin(newAgentPlugin())
}
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/webapi/agent/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestPlugin(t *testing.T) {
})

t.Run("test newAgentPlugin", func(t *testing.T) {
p := newAgentPlugin(SupportedTaskTypes{})
p := newAgentPlugin()
assert.NotNil(t, p)
assert.Equal(t, "agent-service", p.ID)
assert.NotNil(t, p.PluginLoader)
Expand Down
4 changes: 1 addition & 3 deletions flytepropeller/pkg/controller/nodes/task/plugin_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/k8s"
)

const AgentServiceKey = "agent-service"

var once sync.Once

func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface) (enabledPlugins []core.PluginEntry, defaultForTaskTypes map[pluginID][]taskType, err error) {
Expand All @@ -26,8 +24,8 @@ func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPlu
}

// Register the GRPC plugin after the config is loaded
once.Do(func() { agent.RegisterAgentPlugin() })
pluginsConfigMeta, err := cfg.GetEnabledPlugins()
once.Do(func() { agent.RegisterAgentPlugin(pluginsConfigMeta.AllDefaultForTaskTypes[AgentServiceKey]) })

if err != nil {
return nil, nil, err
Expand Down
1 change: 0 additions & 1 deletion rsts/deployment/agents/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,3 @@ Discover the process of setting up Agents for Flyte.

bigquery
mmcloud

2 changes: 1 addition & 1 deletion rsts/deployment/plugins/webapi/databricks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -380,4 +380,4 @@ Wait for the upgrade to complete. You can check the status of the deployment pod

Make sure you enable `custom containers
<https://docs.databricks.com/administration-guide/clusters/container-services.html>`__
on your Databricks cluster before you trigger the workflow.
on your Databricks cluster before you trigger the workflow.