Skip to content

Commit

Permalink
refactor(webapi): Improve agent client handling and logging
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw committed Jun 8, 2024
1 parent d83e51f commit 86c0ccc
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 15 deletions.
24 changes: 15 additions & 9 deletions flyteplugins/go/tasks/plugins/webapi/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ func updateAgentRegistry(ctx context.Context, cs *ClientSet) {
}
agentDeployments = append(agentDeployments, maps.Values(cfg.AgentDeployments)...)
for _, agentDeployment := range agentDeployments {
client := cs.agentMetadataClients[agentDeployment.Endpoint]
client, ok := cs.agentMetadataClients[agentDeployment.Endpoint]
if !ok {
logger.Warningf(ctx, "Agent client not found in the clientSet for the endpoint: %v", agentDeployment.Endpoint)
continue
}

finalCtx, cancel := getFinalContext(ctx, "ListAgents", agentDeployment)
defer cancel()
Expand Down Expand Up @@ -162,7 +166,7 @@ func updateAgentRegistry(ctx context.Context, cs *ClientSet) {
SetAgentRegistry(agentRegistry)
}

func initializeAgentClientSets(ctx context.Context) *ClientSet {
func getAgentClientSets(ctx context.Context) *ClientSet {
logger.Infof(ctx, "Initializing agent clients")

clientSet := &ClientSet{
Expand All @@ -178,17 +182,19 @@ func initializeAgentClientSets(ctx context.Context) *ClientSet {
agentDeployments = append(agentDeployments, &cfg.DefaultAgent)
}
agentDeployments = append(agentDeployments, maps.Values(cfg.AgentDeployments)...)
for _, agentService := range agentDeployments {
if _, ok := clientSet.agentMetadataClients[agentService.Endpoint]; ok {
for _, agentDeployment := range agentDeployments {
if _, ok := clientSet.agentMetadataClients[agentDeployment.Endpoint]; ok {
logger.Infof(ctx, "Agent client already initialized for [%v]", agentDeployment.Endpoint)
continue
}
conn, err := getGrpcConnection(ctx, agentService)
conn, err := getGrpcConnection(ctx, agentDeployment)
if err != nil {
logger.Warningf(ctx, "failed to create connection to agent: [%v] with error: [%v]", agentService, err)
logger.Errorf(ctx, "failed to create connection to agent: [%v] with error: [%v]", agentDeployment, err)
continue
}
clientSet.syncAgentClients[agentService.Endpoint] = service.NewSyncAgentServiceClient(conn)
clientSet.asyncAgentClients[agentService.Endpoint] = service.NewAsyncAgentServiceClient(conn)
clientSet.agentMetadataClients[agentService.Endpoint] = service.NewAgentMetadataServiceClient(conn)
clientSet.syncAgentClients[agentDeployment.Endpoint] = service.NewSyncAgentServiceClient(conn)
clientSet.asyncAgentClients[agentDeployment.Endpoint] = service.NewAsyncAgentServiceClient(conn)
clientSet.agentMetadataClients[agentDeployment.Endpoint] = service.NewAgentMetadataServiceClient(conn)
}
return clientSet
}
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/webapi/agent/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestInitializeClients(t *testing.T) {
syncAgentClients: make(map[string]service.SyncAgentServiceClient),
agentMetadataClients: make(map[string]service.AgentMetadataServiceClient),
}
cs = initializeAgentClientSets(ctx)
cs = getAgentClientSets(ctx)
_, ok := cs.syncAgentClients["y"]
assert.True(t, ok)
_, ok = cs.asyncAgentClients["x"]
Expand Down
9 changes: 4 additions & 5 deletions flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func GetAgentRegistry() Registry {

func SetAgentRegistry(r Registry) {
mu.Lock()
defer mu.Unlock()
agentRegistry = r
mu.Unlock()
}

type Plugin struct {
Expand Down Expand Up @@ -342,9 +342,8 @@ func (p Plugin) getAsyncAgentClient(ctx context.Context, agent *Deployment) (ser

func (p Plugin) watchAgents(ctx context.Context) {
go wait.Until(func() {
mu.Lock()
defer mu.Unlock()
updateAgentRegistry(ctx, p.cs)
clientSet := getAgentClientSets(ctx)
updateAgentRegistry(ctx, clientSet)
}, p.cfg.PollInterval.Duration, ctx.Done())
}

Expand Down Expand Up @@ -396,7 +395,7 @@ func newAgentPlugin() webapi.PluginEntry {
ctx := context.Background()
cfg := GetConfig()

clientSet := initializeAgentClientSets(ctx)
clientSet := getAgentClientSets(ctx)
updateAgentRegistry(ctx, clientSet)
supportedTaskTypes := append(maps.Keys(GetAgentRegistry()), cfg.SupportedTaskTypes...)

Expand Down

0 comments on commit 86c0ccc

Please sign in to comment.