From ef9cfe8fce9cb99d786eeb738e0679772064bab4 Mon Sep 17 00:00:00 2001 From: Future Outlier Date: Tue, 19 Dec 2023 10:35:09 +0800 Subject: [PATCH] use grpc code status to handle case list agent method not implemented Signed-off-by: Future Outlier --- .github/workflows/single-binary.yml | 2 +- .../tasks/plugins/array/awsbatch/executor.go | 2 +- .../go/tasks/plugins/array/k8s/management.go | 8 ++--- .../go/tasks/plugins/webapi/agent/plugin.go | 29 ++++++++++++------- 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/.github/workflows/single-binary.yml b/.github/workflows/single-binary.yml index b3c924b0f9..9a2c19a52c 100644 --- a/.github/workflows/single-binary.yml +++ b/.github/workflows/single-binary.yml @@ -170,7 +170,7 @@ jobs: cpu: "0" memory: "0" EOF - flytectl demo start --image flyte-sandbox-bundled:local --disable-agent --imagePullPolicy Never + flytectl demo start --image flyte-sandbox-bundled:local --imagePullPolicy Never - name: Install Python dependencies run: | python -m pip install --upgrade pip diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go b/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go index 6c7a858be8..1e98736129 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go @@ -78,7 +78,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c case arrayCore.PhaseAssembleFinalOutput: pluginState.State, err = array.AssembleFinalOutputs(ctx, e.outputAssembler, tCtx, arrayCore.PhaseSuccess, version+1, pluginState.State) - + case arrayCore.PhaseAbortSubTasks: fallthrough diff --git a/flyteplugins/go/tasks/plugins/array/k8s/management.go b/flyteplugins/go/tasks/plugins/array/k8s/management.go index 510f202e1a..d6abaaf74b 100644 --- a/flyteplugins/go/tasks/plugins/array/k8s/management.go +++ b/flyteplugins/go/tasks/plugins/array/k8s/management.go @@ -382,10 +382,10 @@ func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, kube messageCollector.Collect(childIdx, err.Error()) } else { externalResources = append(externalResources, &core.ExternalResource{ - ExternalID: stCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), - Index: uint32(originalIdx), - RetryAttempt: uint32(retryAttempt), - Phase: core.PhaseAborted, + ExternalID: stCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), + Index: uint32(originalIdx), + RetryAttempt: uint32(retryAttempt), + Phase: core.PhaseAborted, }) } } diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go index 34a950ce39..c4f5fa0c70 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go @@ -9,9 +9,11 @@ import ( "golang.org/x/exp/maps" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/status" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" flyteIdl "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" @@ -36,7 +38,7 @@ type Plugin struct { cfg *Config getClient GetClientFunc connectionCache map[*Agent]*grpc.ClientConn - agentRegistry map[string]map[bool]*Agent // map[taskType][isSync] => Agent + agentRegistry map[string]*Agent // map[taskType] => Agent } type ResourceWrapper struct { @@ -326,14 +328,13 @@ func getFinalContext(ctx context.Context, operation string, agent *Agent) (conte return context.WithTimeout(ctx, timeout) } -func initializeAgentRegistry(cfg *Config, connectionCache map[*Agent]*grpc.ClientConn, getAgentMetadataClientFunc GetAgentMetadataClientFunc) (map[string]map[bool]*Agent, error) { - agentRegistry := make(map[string]map[bool]*Agent) +func initializeAgentRegistry(cfg *Config, connectionCache map[*Agent]*grpc.ClientConn, getAgentMetadataClientFunc GetAgentMetadataClientFunc) (map[string]*Agent, error) { + agentRegistry := make(map[string]*Agent) var agentDeployments []*Agent // Ensure that the old configuration is backward compatible for taskType, agentID := range cfg.AgentForTaskTypes { - agentRegistry[taskType] = make(map[bool]*Agent) - agentRegistry[taskType][false] = cfg.Agents[agentID] + agentRegistry[taskType] = cfg.Agents[agentID] } if len(cfg.DefaultAgent.Endpoint) != 0 { @@ -351,19 +352,25 @@ func initializeAgentRegistry(cfg *Config, connectionCache map[*Agent]*grpc.Clien res, err := client.ListAgent(finalCtx, &admin.ListAgentsRequest{}) if err != nil { + grpc_status, ok := status.FromError(err) + if grpc_status.Code() == codes.Unimplemented { + // we should not panic here, as we want to continue to support old agent settings + logger.Infof(context.Background(), "list agent method not implemented for agent: [%v]", agentDeployment) + continue + } + + if !ok { + return nil, fmt.Errorf("failed to list agent with a non-gRPC error : [%v]", err) + } + return nil, fmt.Errorf("failed to list agent with error: [%v]", err) } agents := res.GetAgents() for _, agent := range agents { supportedTaskTypes := agent.SupportedTaskTypes - isSync := agent.IsSync - for _, supportedTaskType := range supportedTaskTypes { - if _, ok := agentRegistry[supportedTaskType]; !ok { - agentRegistry[supportedTaskType] = make(map[bool]*Agent) - } - agentRegistry[supportedTaskType][isSync] = agentDeployment + agentRegistry[supportedTaskType] = agentDeployment } } }