Skip to content

Commit

Permalink
Merge pull request #398 from proost/issue397-refactor-distribute-to-r…
Browse files Browse the repository at this point in the history
…eplicas

refactor: distribute slots to replicas
  • Loading branch information
rueian authored Nov 1, 2023
2 parents 425a16a + 29c152a commit 0242d4e
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 24 deletions.
18 changes: 11 additions & 7 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,18 @@ func (c *clusterClient) _refresh() (err error) {

slots := [16384]conn{}
for master, g := range groups {
addr := master
if c.opt.ReplicaOnly && len(g.nodes) > 1 {
addr = g.nodes[1+rand.Intn(len(g.nodes)-1)]
}
cc := conns[addr]
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1]; i++ {
slots[i] = cc
nodesCount := len(g.nodes)
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1]; i++ {
slots[i] = conns[g.nodes[1+rand.Intn(nodesCount-1)]]
}
}
} else {
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1]; i++ {
slots[i] = conns[master]
}
}
}
}
Expand Down
142 changes: 125 additions & 17 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,57 @@ var slotsMultiResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{
}},
}}, nil)

var slotsMultiRespWithMultiReplicas = newResult(RedisMessage{typ: '*', values: []RedisMessage{
{typ: '*', values: []RedisMessage{
{typ: ':', integer: 0},
{typ: ':', integer: 8192},
{typ: '*', values: []RedisMessage{ // master
{typ: '+', string: "127.0.0.1"},
{typ: ':', integer: 0},
{typ: '+', string: ""},
}},
{typ: '*', values: []RedisMessage{ // replica1
{typ: '+', string: "127.0.0.2"},
{typ: ':', integer: 1},
{typ: '+', string: ""},
}},
{typ: '*', values: []RedisMessage{ // replica2
{typ: '+', string: "127.0.0.3"},
{typ: ':', integer: 2},
{typ: '+', string: ""},
}},
{typ: '*', values: []RedisMessage{ // replica3
{typ: '+', string: "127.0.0.4"},
{typ: ':', integer: 3},
{typ: '+', string: ""},
}},
}},
{typ: '*', values: []RedisMessage{
{typ: ':', integer: 8193},
{typ: ':', integer: 16383},
{typ: '*', values: []RedisMessage{ // master
{typ: '+', string: "127.0.1.1"},
{typ: ':', integer: 0},
{typ: '+', string: ""},
}},
{typ: '*', values: []RedisMessage{ // replica1
{typ: '+', string: "127.0.1.2"},
{typ: ':', integer: 1},
{typ: '+', string: ""},
}},
{typ: '*', values: []RedisMessage{ // replica2
{typ: '+', string: "127.0.1.3"},
{typ: ':', integer: 2},
{typ: '+', string: ""},
}},
{typ: '*', values: []RedisMessage{ // replica3
{typ: '+', string: "127.0.1.4"},
{typ: ':', integer: 3},
{typ: '+', string: ""},
}},
}},
}}, nil)

var singleSlotResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{
{typ: '*', values: []RedisMessage{
{typ: ':', integer: 0},
Expand Down Expand Up @@ -2273,25 +2324,82 @@ func TestClusterClientReplicaOnly_PickReplica(t *testing.T) {

func TestClusterClientReplicaOnly_PickMasterIfNoReplica(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
m := &mockConn{
DoFn: func(cmd Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
return singleSlotResp
}
return RedisResult{}
},
}
t.Run("replicas should be picked", func(t *testing.T) {
m := &mockConn{
DoFn: func(cmd Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
return slotsMultiResp
}
return RedisResult{}
},
}

client, err := newClusterClient(&ClientOption{InitAddress: []string{"127.0.0.1:0"}, ReplicaOnly: true}, func(dst string, opt *ClientOption) conn {
copiedM := *m
return &copiedM
client, err := newClusterClient(&ClientOption{InitAddress: []string{"127.0.0.1:0"}, ReplicaOnly: true}, func(dst string, opt *ClientOption) conn {
copiedM := *m
return &copiedM
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}

if client.slots[0] != client.conns["127.0.1.1:1"] {
t.Fatalf("unexpected replica node assigned to slot 0")
}
if client.slots[8192] != client.conns["127.0.1.1:1"] {
t.Fatalf("unexpected replica node assigned to slot 8192")
}
if client.slots[8193] != client.conns["127.0.3.1:1"] {
t.Fatalf("unexpected replica node assigned to slot 8193")
}
if client.slots[16383] != client.conns["127.0.3.1:1"] {
t.Fatalf("unexpected replica node assigned to slot 16383")
}
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}
t.Run("replicas should be picked", func(t *testing.T) {
if client.slots[0] != client.conns["127.0.0.1:0"] {
t.Fatalf("unexpected node assigned to slot 0")

t.Run("distributed to replicas", func(t *testing.T) {
m := &mockConn{
DoFn: func(cmd Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
return slotsMultiRespWithMultiReplicas
}
return RedisResult{}
},
}

client, err := newClusterClient(&ClientOption{InitAddress: []string{"127.0.0.1:0"}, ReplicaOnly: true}, func(dst string, opt *ClientOption) conn {
copiedM := *m
return &copiedM
})
if err != nil {
t.Fatalf("unexpected err %v", err)
}

for slot := 0; slot < 8193; slot++ {
if client.slots[slot] == client.conns["127.0.0.2:1"] {
continue
}
if client.slots[slot] == client.conns["127.0.0.3:2"] {
continue
}
if client.slots[slot] == client.conns["127.0.0.4:3"] {
continue
}

t.Fatalf("unexpected replica node assigned to slot %d", slot)
}

for slot := 8193; slot < 16384; slot++ {
if client.slots[slot] == client.conns["127.0.1.2:1"] {
continue
}
if client.slots[slot] == client.conns["127.0.1.3:2"] {
continue
}
if client.slots[slot] == client.conns["127.0.1.4:3"] {
continue
}

t.Fatalf("unexpected replica node assigned to slot %d", slot)
}
})
}
Expand Down

0 comments on commit 0242d4e

Please sign in to comment.