Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watch agent metadata service #5017

Merged
merged 22 commits into from
Jun 8, 2024
Prev Previous commit
Next Next commit
defer
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw committed Mar 8, 2024
commit 32a907e84241e5699593d95b3ce549ce6a6d9b38
4 changes: 2 additions & 2 deletions flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@
return nil
}
metadata := taskCtx.ResourceMeta().(ResourceMetaWrapper)
agent, _ := getFinalAgent(&metadata.TaskCategory, p.cfg)

Check warning on line 225 in flyteplugins/go/tasks/plugins/webapi/agent/plugin.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/webapi/agent/plugin.go#L225

Added line #L225 was not covered by tests

client, err := p.getAsyncAgentClient(ctx, agent)
if err != nil {
Expand Down Expand Up @@ -318,13 +318,13 @@
return client, nil
}

func (p Plugin) watchAgents(ctx context.Context) {
go wait.Until(func() {
mu.Lock()
defer mu.Unlock()
updateAgentClientSets(ctx, p.cs)
agentRegistry = updateAgentRegistry(ctx, p.cs)
mu.Unlock()
}, p.cfg.PollInterval.Duration, ctx.Done())

Check warning on line 327 in flyteplugins/go/tasks/plugins/webapi/agent/plugin.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/webapi/agent/plugin.go#L321-L327

Added lines #L321 - L327 were not covered by tests
}

func writeOutput(ctx context.Context, taskCtx webapi.StatusContext, outputs *flyteIdl.LiteralMap) error {
Expand All @@ -351,10 +351,10 @@

func getFinalAgent(taskCategory *admin.TaskCategory, cfg *Config) (*Deployment, bool) {
mu.RLock()
defer mu.RUnlock()
if agent, exists := agentRegistry[taskCategory.Name][taskCategory.Version]; exists {
return agent.AgentDeployment, agent.IsSync
}
mu.RUnlock()
return &cfg.DefaultAgent, false
}

Expand All @@ -371,29 +371,29 @@
}

func newAgentPlugin() webapi.PluginEntry {
ctx := context.Background()
cfg := GetConfig()

Check warning on line 375 in flyteplugins/go/tasks/plugins/webapi/agent/plugin.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/webapi/agent/plugin.go#L374-L375

Added lines #L374 - L375 were not covered by tests

clientSet := &ClientSet{
asyncAgentClients: make(map[string]service.AsyncAgentServiceClient),
syncAgentClients: make(map[string]service.SyncAgentServiceClient),
agentMetadataClients: make(map[string]service.AgentMetadataServiceClient),

Check warning on line 380 in flyteplugins/go/tasks/plugins/webapi/agent/plugin.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/webapi/agent/plugin.go#L377-L380

Added lines #L377 - L380 were not covered by tests
}
updateAgentClientSets(ctx, clientSet)
agentRegistry := updateAgentRegistry(ctx, clientSet)

Check warning on line 383 in flyteplugins/go/tasks/plugins/webapi/agent/plugin.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/webapi/agent/plugin.go#L382-L383

Added lines #L382 - L383 were not covered by tests
supportedTaskTypes := append(maps.Keys(agentRegistry), cfg.SupportedTaskTypes...)

return webapi.PluginEntry{
ID: "agent-service",
SupportedTaskTypes: supportedTaskTypes,
PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) {
plugin := &Plugin{
metricScope: iCtx.MetricsScope(),
cfg: cfg,
cs: clientSet,
}
plugin.watchAgents(ctx)
return plugin, nil

Check warning on line 396 in flyteplugins/go/tasks/plugins/webapi/agent/plugin.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/webapi/agent/plugin.go#L390-L396

Added lines #L390 - L396 were not covered by tests
},
}
}
Expand Down
Loading