Skip to content

Commit

Permalink
perf: construct virtual node with skip list to optimize consistent ha…
Browse files Browse the repository at this point in the history
…sh rebalance
  • Loading branch information
ppzqh committed Jul 4, 2024
1 parent 90325c7 commit 6d92cfb
Show file tree
Hide file tree
Showing 6 changed files with 697 additions and 109 deletions.
92 changes: 13 additions & 79 deletions pkg/loadbalance/consist.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package loadbalance

import (
"context"
"github.com/cloudwego/kitex/pkg/loadbalance/newconsist"
"sort"
"sync"
"time"
Expand All @@ -26,7 +27,6 @@ import (
"golang.org/x/sync/singleflight"

"github.com/cloudwego/kitex/pkg/discovery"
"github.com/cloudwego/kitex/pkg/utils"
)

/*
Expand Down Expand Up @@ -136,7 +136,7 @@ func (v *vNodeType) Swap(i, j int) {

type consistPicker struct {
cb *consistBalancer
info *consistInfo
info *newconsist.ConsistInfo
// index int
// result *consistResult
}
Expand All @@ -159,15 +159,14 @@ func (cp *consistPicker) Recycle() {

// Next is not concurrency safe.
func (cp *consistPicker) Next(ctx context.Context, request interface{}) discovery.Instance {
if len(cp.info.realNodes) == 0 {
if cp.info.IsEmpty() {
return nil
}
key := cp.cb.opt.GetKey(ctx, request)
if key == "" {
return nil
}
res := buildConsistResult(cp.info, xxhash3.HashString(key))
return res.Primary
return cp.info.BuildConsistentResult(xxhash3.HashString(key))
// Todo(DMwangnima): Optimise Replica-related logic
// This comment part is previous implementation considering connecting to Replica
// Since we would create a new picker each time, the Replica logic is unreachable, so just comment it out for now
Expand Down Expand Up @@ -255,7 +254,7 @@ func NewConsistBalancer(opt ConsistentHashOption) Loadbalancer {

// GetPicker implements the Loadbalancer interface.
func (cb *consistBalancer) GetPicker(e discovery.Result) Picker {
var ci *consistInfo
var ci *newconsist.ConsistInfo
if e.Cacheable {
cii, ok := cb.cachedConsistInfo.Load(e.CacheKey)
if !ok {
Expand All @@ -264,7 +263,7 @@ func (cb *consistBalancer) GetPicker(e discovery.Result) Picker {
})
cb.cachedConsistInfo.Store(e.CacheKey, cii)
}
ci = cii.(*consistInfo)
ci = cii.(*newconsist.ConsistInfo)
} else {
ci = cb.newConsistInfo(e)
}
Expand All @@ -274,75 +273,8 @@ func (cb *consistBalancer) GetPicker(e discovery.Result) Picker {
return picker
}

func (cb *consistBalancer) newConsistInfo(e discovery.Result) *consistInfo {
ci := &consistInfo{}
ci.realNodes, ci.virtualNodes = cb.buildNodes(e.Instances)
return ci
}

func (cb *consistBalancer) buildNodes(ins []discovery.Instance) ([]realNode, []virtualNode) {
ret := make([]realNode, len(ins))
for i := range ins {
ret[i].Ins = ins[i]
}
return ret, cb.buildVirtualNodes(ret)
}

func (cb *consistBalancer) buildVirtualNodes(rNodes []realNode) []virtualNode {
totalLen := 0
for i := range rNodes {
totalLen += cb.getVirtualNodeLen(rNodes[i])
}

ret := make([]virtualNode, totalLen)
if totalLen == 0 {
return ret
}
maxLen, maxSerial := 0, 0
for i := range rNodes {
if len(rNodes[i].Ins.Address().String()) > maxLen {
maxLen = len(rNodes[i].Ins.Address().String())
}
if vNodeLen := cb.getVirtualNodeLen(rNodes[i]); vNodeLen > maxSerial {
maxSerial = vNodeLen
}
}
l := maxLen + 1 + utils.GetUIntLen(uint64(maxSerial)) // "$address + # + itoa(i)"
// pre-allocate []byte here, and reuse it to prevent memory allocation.
b := make([]byte, l)

// record the start index.
cur := 0
for i := range rNodes {
bAddr := utils.StringToSliceByte(rNodes[i].Ins.Address().String())
// Assign the first few bits of b to string.
copy(b, bAddr)

// Initialize the last few bits, skipping '#'.
for j := len(bAddr) + 1; j < len(b); j++ {
b[j] = 0
}
b[len(bAddr)] = '#'

vLen := cb.getVirtualNodeLen(rNodes[i])
for j := 0; j < vLen; j++ {
k := j
cnt := 0
// Assign values to b one by one, starting with the last one.
for k > 0 {
b[l-1-cnt] = byte(k % 10)
k /= 10
cnt++
}
// At this point, the index inside ret should be cur + j.
index := cur + j
ret[index].hash = xxhash3.Hash(b)
ret[index].RealNode = &rNodes[i]
}
cur += vLen
}
sort.Sort(&vNodeType{s: ret})
return ret
func (cb *consistBalancer) newConsistInfo(e discovery.Result) *newconsist.ConsistInfo {
return newconsist.NewConsistInfo(e, newconsist.ConsistInfoConfig{VirtualFactor: cb.opt.VirtualFactor, Weighted: cb.opt.Weighted})
}

// get virtual node number from one realNode.
Expand All @@ -364,9 +296,11 @@ func (cb *consistBalancer) Rebalance(change discovery.Change) {
if !change.Result.Cacheable {
return
}
// TODO: Use TreeMap to optimize performance when updating.
// Now, due to the lack of a good red-black tree implementation, we can only build the full amount once per update.
cb.updateConsistInfo(change.Result)
if ci, ok := cb.cachedConsistInfo.Load(change.Result.CacheKey); ok {
ci.(*newconsist.ConsistInfo).Rebalance(change)
} else {
cb.updateConsistInfo(change.Result)
}
}

// Delete implements the Rebalancer interface.
Expand Down
157 changes: 128 additions & 29 deletions pkg/loadbalance/consist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package loadbalance
import (
"context"
"fmt"
"github.com/bytedance/gopkg/lang/fastrand"
"github.com/cloudwego/kitex/pkg/loadbalance/newconsist"
"math/rand"
"runtime"
"strconv"
"strings"
"testing"
"time"

"github.com/bytedance/gopkg/lang/fastrand"

"github.com/cloudwego/kitex/internal"
"github.com/cloudwego/kitex/internal/test"
Expand Down Expand Up @@ -53,13 +53,12 @@ func getRandomKey(ctx context.Context, request interface{}) string {
return key
}

func getRandomString(length int) string {
func getRandomString(r *rand.Rand, length int) string {
var resBuilder strings.Builder
resBuilder.Grow(length)
corpus := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
rand.Seed(time.Now().UnixNano() + int64(100))
for i := 0; i < length; i++ {
resBuilder.WriteByte(corpus[rand.Intn(len(corpus))])
resBuilder.WriteByte(corpus[r.Intn(len(corpus))])
}
return resBuilder.String()
}
Expand Down Expand Up @@ -166,20 +165,29 @@ func TestConsistPicker_Next_NoCache_Consist(t *testing.T) {
CacheKey: "",
Instances: insList,
}
opt.GetKey = func(ctx context.Context, request interface{}) string {
v := ctx.Value("key")
return v.(string)
}

cnt := make(map[string]int)
for _, ins := range insList {
cnt[ins.Address().String()] = 0
}
cnt["null"] = 0

cb := NewConsistBalancer(opt)
picker := cb.GetPicker(e)
ins := picker.Next(context.TODO(), nil)
for i := 0; i < 100; i++ {
picker := cb.GetPicker(e)
test.Assert(t, picker.Next(context.TODO(), nil) == ins)
for i := 0; i < 100000; i++ {
ctx := context.WithValue(context.Background(), "key", strconv.Itoa(i))
if res := picker.Next(ctx, nil); res != nil {
cnt[res.Address().String()]++
} else {
cnt["null"]++
}
}
fmt.Println(cnt)

cb = NewConsistBalancer(opt)
for i := 0; i < 100; i++ {
picker := cb.GetPicker(e)
test.Assert(t, picker.Next(context.TODO(), nil) == ins)
}
}

func TestConsistPicker_Next_Cache(t *testing.T) {
Expand Down Expand Up @@ -315,7 +323,8 @@ func TestConsistPicker_Reblance(t *testing.T) {
picker := cb.GetPicker(e)
key := strconv.Itoa(i)
ctx = context.WithValue(ctx, keyCtxKey, key)
test.DeepEqual(t, record[key], picker.Next(ctx, nil))
res := picker.Next(ctx, nil)
test.DeepEqual(t, record[key], res)
}
}

Expand Down Expand Up @@ -388,36 +397,126 @@ func BenchmarkNewConsistPicker(bb *testing.B) {
// BenchmarkConsistPicker_RandomDistributionKey/1000ins-12 2848216. 407.7 ns/op 48 B/op 1 allocs/op
// BenchmarkConsistPicker_RandomDistributionKey/10000ins
// BenchmarkConsistPicker_RandomDistributionKey/10000ins-12 2701766 492.7 ns/op 48 B/op 1 allocs/op
func BenchmarkConsistPicker_RandomDistributionKey(bb *testing.B) {
func BenchmarkConsistPicker_RandomDistributionKey(b *testing.B) {
n := 10
balancer := NewConsistBalancer(NewConsistentHashOption(getRandomKey))

for i := 0; i < 4; i++ {
bb.Run(fmt.Sprintf("%dins", n), func(b *testing.B) {
b.Run(fmt.Sprintf("%dins", n), func(b *testing.B) {
r := rand.New(rand.NewSource(int64(n)))
inss := makeNInstances(n, 10)
e := discovery.Result{
Cacheable: true,
CacheKey: "test",
Instances: inss,
}
b.ReportAllocs()
b.ResetTimer()
picker := balancer.GetPicker(e)
ctx := context.WithValue(context.Background(), keyCtxKey, getRandomString(30))
ctx := context.WithValue(context.Background(), keyCtxKey, getRandomString(r, 30))
picker.Next(ctx, nil)
picker.(internal.Reusable).Recycle()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.Logf("round %d", i)
b.StopTimer()
ctx = context.WithValue(context.Background(), keyCtxKey, getRandomString(30))
b.StartTimer()
picker := balancer.GetPicker(e)
for j := 0; j < b.N; j++ {
ctx = context.WithValue(context.Background(), keyCtxKey, getRandomString(r, 30))
picker = balancer.GetPicker(e)
picker.Next(ctx, nil)
if r, ok := picker.(internal.Reusable); ok {
r.Recycle()
if toRecycle, ok := picker.(internal.Reusable); ok {
toRecycle.Recycle()
}
}
})
n *= 10
}
}

func BenchmarkRebalance(bb *testing.B) {
weight := 10
nums := 10000

for n := 0; n < 1; n++ {
bb.Run(fmt.Sprintf("consist-remove-%d", nums), func(b *testing.B) {
insList := makeNInstances(nums, weight)
e := discovery.Result{
Cacheable: true,
CacheKey: "",
Instances: insList,
}
newConsist := newconsist.NewConsistInfo(e, newconsist.ConsistInfoConfig{
VirtualFactor: 100,
Weighted: true,
})

b.ReportAllocs()
b.ResetTimer()
change := discovery.Change{
Result: e,
Added: nil,
Updated: nil,
}
removed := []discovery.Instance{insList[0]}
for i := 0; i < nums; i++ {
e.Instances = insList[i+1:]
removed[0] = insList[i]
change.Result = e
change.Removed = removed
newConsist.Rebalance(change)
}
runtime.GC()
})

bb.Run(fmt.Sprintf("consist-add-%d", nums), func(b *testing.B) {
insList := makeNInstances(nums, weight)
e := discovery.Result{
Cacheable: true,
CacheKey: "",
Instances: insList,
}
newConsist := newconsist.NewConsistInfo(e, newconsist.ConsistInfoConfig{
VirtualFactor: 100,
Weighted: true,
})

b.ReportAllocs()
b.ResetTimer()
change := discovery.Change{
Result: e,
Added: nil,
Updated: nil,
}
added := []discovery.Instance{insList[0]}
for i := 0; i < nums; i++ {
e.Instances = insList[:i+1]
added[0] = insList[i]
change.Result = e
change.Added = added
newConsist.Rebalance(change)
}
runtime.GC()
})
nums *= 10
}

}

func BenchmarkNewConsistInfo(b *testing.B) {
weight := 10
nums := 10
for n := 0; n < 4; n++ {
b.Run(fmt.Sprintf("new-consist-%d", nums), func(b *testing.B) {
insList := makeNInstances(nums, weight)
e := discovery.Result{
Cacheable: false,
CacheKey: "",
Instances: insList,
}
b.ResetTimer()
b.ReportAllocs()
newConsist := newconsist.NewConsistInfo(e, newconsist.ConsistInfoConfig{
VirtualFactor: 100,
Weighted: true,
})
_ = newConsist
})
nums *= 10
}
}
Loading

0 comments on commit 6d92cfb

Please sign in to comment.