Skip to content

Commit

Permalink
refactor: simplify cluster refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed Sep 18, 2023
1 parent 3f8295d commit 658e982
Showing 1 changed file with 23 additions and 36 deletions.
59 changes: 23 additions & 36 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,66 +140,53 @@ func (c *clusterClient) refresh() (err error) {
type clusterslots struct {
reply RedisResult
addr string
ver int
}

func (s clusterslots) parse(tls bool) map[string]group {
if s.ver < 7 {
return parseSlots(s.reply.val, s.addr)
}
return parseShards(s.reply.val, s.addr, tls)
}

func getClusterSlots(c conn) clusterslots {
if c.Version() < 7 {
return clusterslots{reply: c.Do(context.Background(), cmds.SlotCmd), addr: c.Addr(), ver: c.Version()}
}
return clusterslots{reply: c.Do(context.Background(), cmds.ShardsCmd), addr: c.Addr(), ver: c.Version()}
}

func (c *clusterClient) _refresh() (err error) {
type versionedResult struct {
version int
reply RedisResult
addr string
}
var (
reply RedisMessage
addr string
version int
)
c.mu.RLock()
results := make(chan versionedResult, len(c.conns))
results := make(chan clusterslots, len(c.conns))
pending := make([]conn, 0, len(c.conns))
for _, cc := range c.conns {
pending = append(pending, cc)
}
c.mu.RUnlock()

var result clusterslots
for i := 0; i < cap(results); i++ {
if i&3 == 0 { // batch CLUSTER SLOTS/CLUSTER SHARDS for every 4 connections
for j := i; j < i+4 && j < len(pending); j++ {
go func(c conn) {
var reply RedisResult
if c.Version() < 7 {
reply = c.Do(context.Background(), cmds.SlotCmd)
} else {
reply = c.Do(context.Background(), cmds.ShardsCmd)
}
results <- versionedResult{
version: c.Version(),
reply: reply,
addr: c.Addr(),
}
results <- getClusterSlots(c)
}(pending[j])
}
}
r := <-results
version = r.version
addr = r.addr
reply, err = r.reply.ToMessage()
if len(reply.values) != 0 {
result = <-results
err = result.reply.Error()
if len(result.reply.val.values) != 0 {
break
}
}
pending = nil

if err != nil {
return err
}
pending = nil

var groups map[string]group
if version < 7 {
groups = parseSlots(reply, addr)
} else {
groups = parseShards(reply, addr, c.opt.TLSConfig != nil)
}

groups := result.parse(c.opt.TLSConfig != nil)
conns := make(map[string]conn, len(groups))
for _, g := range groups {
for _, addr := range g.nodes {
Expand Down

0 comments on commit 658e982

Please sign in to comment.