diff --git a/cluster.go b/cluster.go index 48d41268..772b5864 100644 --- a/cluster.go +++ b/cluster.go @@ -233,14 +233,18 @@ func (c *clusterClient) _refresh() (err error) { slots := [16384]conn{} for master, g := range groups { - addr := master if c.opt.ReplicaOnly && len(g.nodes) > 1 { - addr = g.nodes[1+rand.Intn(len(g.nodes)-1)] - } - cc := conns[addr] - for _, slot := range g.slots { - for i := slot[0]; i <= slot[1]; i++ { - slots[i] = cc + nodesCount := len(g.nodes) + for _, slot := range g.slots { + for i := slot[0]; i <= slot[1]; i++ { + slots[i] = conns[g.nodes[1+rand.Intn(nodesCount-1)]] + } + } + } else { + for _, slot := range g.slots { + for i := slot[0]; i <= slot[1]; i++ { + slots[i] = conns[master] + } } } } diff --git a/cluster_test.go b/cluster_test.go index 29b7dee2..0da1515d 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -63,6 +63,57 @@ var slotsMultiResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{ }}, }}, nil) +var slotsMultiRespWithMultiReplicas = newResult(RedisMessage{typ: '*', values: []RedisMessage{ + {typ: '*', values: []RedisMessage{ + {typ: ':', integer: 0}, + {typ: ':', integer: 8192}, + {typ: '*', values: []RedisMessage{ // master + {typ: '+', string: "127.0.0.1"}, + {typ: ':', integer: 0}, + {typ: '+', string: ""}, + }}, + {typ: '*', values: []RedisMessage{ // replica1 + {typ: '+', string: "127.0.0.2"}, + {typ: ':', integer: 1}, + {typ: '+', string: ""}, + }}, + {typ: '*', values: []RedisMessage{ // replica2 + {typ: '+', string: "127.0.0.3"}, + {typ: ':', integer: 2}, + {typ: '+', string: ""}, + }}, + {typ: '*', values: []RedisMessage{ // replica3 + {typ: '+', string: "127.0.0.4"}, + {typ: ':', integer: 3}, + {typ: '+', string: ""}, + }}, + }}, + {typ: '*', values: []RedisMessage{ + {typ: ':', integer: 8193}, + {typ: ':', integer: 16383}, + {typ: '*', values: []RedisMessage{ // master + {typ: '+', string: "127.0.1.1"}, + {typ: ':', integer: 0}, + {typ: '+', string: ""}, + }}, + {typ: '*', values: []RedisMessage{ // replica1 + {typ: '+', string: "127.0.1.2"}, + {typ: ':', integer: 1}, + {typ: '+', string: ""}, + }}, + {typ: '*', values: []RedisMessage{ // replica2 + {typ: '+', string: "127.0.1.3"}, + {typ: ':', integer: 2}, + {typ: '+', string: ""}, + }}, + {typ: '*', values: []RedisMessage{ // replica3 + {typ: '+', string: "127.0.1.4"}, + {typ: ':', integer: 3}, + {typ: '+', string: ""}, + }}, + }}, +}}, nil) + var singleSlotResp = newResult(RedisMessage{typ: '*', values: []RedisMessage{ {typ: '*', values: []RedisMessage{ {typ: ':', integer: 0}, @@ -2273,25 +2324,82 @@ func TestClusterClientReplicaOnly_PickReplica(t *testing.T) { func TestClusterClientReplicaOnly_PickMasterIfNoReplica(t *testing.T) { defer ShouldNotLeaked(SetupLeakDetection()) - m := &mockConn{ - DoFn: func(cmd Completed) RedisResult { - if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { - return singleSlotResp - } - return RedisResult{} - }, - } + t.Run("replicas should be picked", func(t *testing.T) { + m := &mockConn{ + DoFn: func(cmd Completed) RedisResult { + if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { + return slotsMultiResp + } + return RedisResult{} + }, + } - client, err := newClusterClient(&ClientOption{InitAddress: []string{"127.0.0.1:0"}, ReplicaOnly: true}, func(dst string, opt *ClientOption) conn { - copiedM := *m - return &copiedM + client, err := newClusterClient(&ClientOption{InitAddress: []string{"127.0.0.1:0"}, ReplicaOnly: true}, func(dst string, opt *ClientOption) conn { + copiedM := *m + return &copiedM + }) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + if client.slots[0] != client.conns["127.0.1.1:1"] { + t.Fatalf("unexpected replica node assigned to slot 0") + } + if client.slots[8192] != client.conns["127.0.1.1:1"] { + t.Fatalf("unexpected replica node assigned to slot 8192") + } + if client.slots[8193] != client.conns["127.0.3.1:1"] { + t.Fatalf("unexpected replica node assigned to slot 8193") + } + if client.slots[16383] != client.conns["127.0.3.1:1"] { + t.Fatalf("unexpected replica node assigned to slot 16383") + } }) - if err != nil { - t.Fatalf("unexpected err %v", err) - } - t.Run("replicas should be picked", func(t *testing.T) { - if client.slots[0] != client.conns["127.0.0.1:0"] { - t.Fatalf("unexpected node assigned to slot 0") + + t.Run("distributed to replicas", func(t *testing.T) { + m := &mockConn{ + DoFn: func(cmd Completed) RedisResult { + if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" { + return slotsMultiRespWithMultiReplicas + } + return RedisResult{} + }, + } + + client, err := newClusterClient(&ClientOption{InitAddress: []string{"127.0.0.1:0"}, ReplicaOnly: true}, func(dst string, opt *ClientOption) conn { + copiedM := *m + return &copiedM + }) + if err != nil { + t.Fatalf("unexpected err %v", err) + } + + for slot := 0; slot < 8193; slot++ { + if client.slots[slot] == client.conns["127.0.0.2:1"] { + continue + } + if client.slots[slot] == client.conns["127.0.0.3:2"] { + continue + } + if client.slots[slot] == client.conns["127.0.0.4:3"] { + continue + } + + t.Fatalf("unexpected replica node assigned to slot %d", slot) + } + + for slot := 8193; slot < 16384; slot++ { + if client.slots[slot] == client.conns["127.0.1.2:1"] { + continue + } + if client.slots[slot] == client.conns["127.0.1.3:2"] { + continue + } + if client.slots[slot] == client.conns["127.0.1.4:3"] { + continue + } + + t.Fatalf("unexpected replica node assigned to slot %d", slot) } }) }