From 6b8872e3e0b696fc91100f6a4adcc3f4ea77f128 Mon Sep 17 00:00:00 2001 From: DuodenumL Date: Wed, 27 Oct 2021 10:57:10 +0800 Subject: [PATCH] user core's client pool lib (#78) --- store/core/client.go | 12 +++-- store/core/rpcpool.go | 112 ------------------------------------------ 2 files changed, 9 insertions(+), 115 deletions(-) delete mode 100644 store/core/rpcpool.go diff --git a/store/core/client.go b/store/core/client.go index deb155f..2a62501 100644 --- a/store/core/client.go +++ b/store/core/client.go @@ -6,6 +6,7 @@ import ( "time" "github.com/projecteru2/agent/types" + "github.com/projecteru2/core/client" pb "github.com/projecteru2/core/rpc/gen" "github.com/patrickmn/go-cache" @@ -14,7 +15,7 @@ import ( // Store use core to store meta type Store struct { - clientPool *ClientPool + clientPool *client.Pool config *types.Config cache *cache.Cache } @@ -24,7 +25,12 @@ var once sync.Once // New new a Store func New(ctx context.Context, config *types.Config) (*Store, error) { - clientPool, err := NewCoreRPCClientPool(ctx, config) + clientPoolConfig := &client.PoolConfig{ + EruAddrs: config.Core, + Auth: config.Auth, + ConnectionTimeout: config.GlobalConnectionTimeout, + } + clientPool, err := client.NewCoreRPCClientPool(ctx, clientPoolConfig) if err != nil { return nil, err } @@ -34,7 +40,7 @@ func New(ctx context.Context, config *types.Config) (*Store, error) { // GetClient returns a gRPC client func (c *Store) GetClient() pb.CoreRPCClient { - return c.clientPool.getClient() + return c.clientPool.GetClient() } // Init inits the core store only once diff --git a/store/core/rpcpool.go b/store/core/rpcpool.go deleted file mode 100644 index 910f3d0..0000000 --- a/store/core/rpcpool.go +++ /dev/null @@ -1,112 +0,0 @@ -package core - -import ( - "context" - "errors" - "sync" - "time" - - "github.com/projecteru2/agent/types" - "github.com/projecteru2/agent/utils" - "github.com/projecteru2/core/client" - pb "github.com/projecteru2/core/rpc/gen" - - log "github.com/sirupsen/logrus" -) - -type clientWithStatus struct { - client pb.CoreRPCClient - addr string - alive bool -} - -// ClientPool implement of RPCClientPool -type ClientPool struct { - rpcClients []*clientWithStatus -} - -func checkAlive(ctx context.Context, rpc *clientWithStatus, timeout time.Duration) bool { - var err error - utils.WithTimeout(ctx, timeout, func(ctx context.Context) { - _, err = rpc.client.Info(ctx, &pb.Empty{}) - }) - if err != nil { - log.Errorf("[ClientPool] connect to %s failed, err: %s", rpc.addr, err) - return false - } - log.Debugf("[ClientPool] connect to %s success", rpc.addr) - return true -} - -// NewCoreRPCClientPool . -func NewCoreRPCClientPool(ctx context.Context, config *types.Config) (*ClientPool, error) { - if len(config.Core) == 0 { - return nil, errors.New("core addr not set") - } - c := &ClientPool{rpcClients: []*clientWithStatus{}} - for _, addr := range config.Core { - var rpc *client.Client - var err error - utils.WithTimeout(ctx, config.GlobalConnectionTimeout, func(ctx context.Context) { - rpc, err = client.NewClient(ctx, addr, config.Auth) - }) - if err != nil { - log.Errorf("[NewCoreRPCClientPool] connect to %s failed, err: %s", addr, err) - continue - } - rpcClient := rpc.GetRPCClient() - c.rpcClients = append(c.rpcClients, &clientWithStatus{client: rpcClient, addr: addr}) - } - - // init client status - c.updateClientsStatus(ctx, config.GlobalConnectionTimeout) - - allFailed := true - for _, rpc := range c.rpcClients { - if rpc.alive { - allFailed = false - } - } - - if allFailed { - log.Error("[NewCoreRPCClientPool] all connections failed") - return nil, errors.New("all connections failed") - } - - go func() { - ticker := time.NewTicker(config.GlobalConnectionTimeout * 2) - defer ticker.Stop() - for { - select { - case <-ticker.C: - c.updateClientsStatus(ctx, config.GlobalConnectionTimeout) - case <-ctx.Done(): - return - } - } - }() - - return c, nil -} - -func (c *ClientPool) updateClientsStatus(ctx context.Context, timeout time.Duration) { - wg := &sync.WaitGroup{} - for _, rpc := range c.rpcClients { - wg.Add(1) - go func(r *clientWithStatus) { - defer wg.Done() - r.alive = checkAlive(ctx, r, timeout) - }(rpc) - } - wg.Wait() -} - -// getClient finds the first *client.Client instance with an active connection. If all connections are dead, returns the first one. -func (c *ClientPool) getClient() pb.CoreRPCClient { - for _, rpc := range c.rpcClients { - if rpc.alive { - return rpc.client - } - } - return c.rpcClients[0].client -}