From bba8c11d886b12a9733e46ca319c9550adebcf46 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Fri, 14 Jun 2024 12:01:51 +0800 Subject: [PATCH] [flyteagent] Remove redundant code in Agent (#5454) * Remove Redundant code in Agent Signed-off-by: Future-Outlier * improve error messages Signed-off-by: Future-Outlier * change to Signed-off-by: Future-Outlier * Remove defer func() by pingsu's advice. Signed-off-by: Future-Outlier Co-authored-by: pingsutw --------- Signed-off-by: Future-Outlier Co-authored-by: pingsutw --- .../go/tasks/plugins/webapi/agent/client.go | 17 +++++------------ .../go/tasks/plugins/webapi/agent/plugin.go | 7 ++----- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/client.go b/flyteplugins/go/tasks/plugins/webapi/agent/client.go index d8c8b055dc..e698d32121 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/client.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/client.go @@ -62,19 +62,12 @@ func getGrpcConnection(ctx context.Context, agent *Deployment) (*grpc.ClientConn if err != nil { return nil, err } - defer func() { - if err != nil { - if cerr := conn.Close(); cerr != nil { - grpclog.Infof("Failed to close conn to %s: %v", agent, cerr) - } - return + + go func() { + <-ctx.Done() + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", agent, cerr) } - go func() { - <-ctx.Done() - if cerr := conn.Close(); cerr != nil { - grpclog.Infof("Failed to close conn to %s: %v", agent, cerr) - } - }() }() return conn, nil diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go index 368dffcef4..99a83aeb3b 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go @@ -183,6 +183,7 @@ func (p Plugin) ExecuteTaskSync( in, err := stream.Recv() if err != nil { + logger.Errorf(ctx, "Failed to write output with err %s", err.Error()) return nil, nil, err } if in.GetHeader() == nil { @@ -193,11 +194,7 @@ func (p Plugin) ExecuteTaskSync( resource := in.GetHeader().GetResource() if err := stream.CloseSend(); err != nil { - return nil, nil, err - } - - if err != nil { - logger.Errorf(ctx, "Failed to write output with err %s", err.Error()) + logger.Errorf(ctx, "Failed to close stream with err %s", err.Error()) return nil, nil, err }