Skip to content

Commit

Permalink
feat: new raft leader balancing algorithm (#390)
Browse files Browse the repository at this point in the history
* feat: new raft leader balancing algorithm

Signed-off-by: wenfeng <[email protected]>

* update vanus.yml

Signed-off-by: wenfeng <[email protected]>

* fix lint & ut

Signed-off-by: wenfeng <[email protected]>

Signed-off-by: wenfeng <[email protected]>
  • Loading branch information
wenfengwang authored Jan 6, 2023
1 parent 060b7af commit 5a2e6f2
Show file tree
Hide file tree
Showing 14 changed files with 312 additions and 77 deletions.
27 changes: 23 additions & 4 deletions internal/controller/eventbus/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate mockgen -source=block.go -destination=mock_block.go -package=block
package block

import (
Expand All @@ -23,6 +24,7 @@ import (

"github.com/huandu/skiplist"
"github.com/linkall-labs/vanus/internal/controller/eventbus/metadata"
"github.com/linkall-labs/vanus/internal/controller/eventbus/server"
"github.com/linkall-labs/vanus/internal/kv"
"github.com/linkall-labs/vanus/internal/primitive/vanus"
"github.com/linkall-labs/vanus/observability/log"
Expand All @@ -38,6 +40,7 @@ const (
type Allocator interface {
Run(ctx context.Context, kvCli kv.Client, dynamicAllocate bool) error
Pick(ctx context.Context, num int) ([]*metadata.Block, error)
PickByVolumes(ctx context.Context, volumes []vanus.ID) ([]*metadata.Block, error)
Stop()
}

Expand Down Expand Up @@ -66,6 +69,18 @@ type allocator struct {
blockCapacity int64
}

func (al *allocator) PickByVolumes(ctx context.Context, volumes []vanus.ID) ([]*metadata.Block, error) {
instances := make([]server.Instance, len(volumes))
for idx := range volumes {
i := al.selector.SelectByID(volumes[idx])
if i == nil {
return nil, errors.ErrVolumeInstanceNoServer
}
instances[idx] = i
}
return al.pick(ctx, instances)
}

func (al *allocator) Run(ctx context.Context, kvCli kv.Client, startDynamicAllocate bool) error {
al.kvClient = kvCli
pairs, err := al.kvClient.List(ctx, metadata.BlockKeyPrefixInKVStore)
Expand Down Expand Up @@ -99,15 +114,19 @@ func (al *allocator) Run(ctx context.Context, kvCli kv.Client, startDynamicAlloc
func (al *allocator) Pick(ctx context.Context, num int) ([]*metadata.Block, error) {
al.mutex.Lock()
defer al.mutex.Unlock()
blockArr := make([]*metadata.Block, num)

instances := al.selector.Select(num, al.blockCapacity)
if len(instances) == 0 {
return nil, errors.ErrVolumeInstanceNotFound
}
for idx := 0; idx < num; idx++ {
ins := instances[idx]

return al.pick(ctx, instances)
}

func (al *allocator) pick(ctx context.Context, volumes []server.Instance) ([]*metadata.Block, error) {
blockArr := make([]*metadata.Block, len(volumes))
for idx := range volumes {
var skipList *skiplist.SkipList
ins := volumes[idx]
v, exist := al.volumeBlockBuffer.Load(ins.GetMeta().ID.Key())
var err error
var block *metadata.Block
Expand Down
16 changes: 16 additions & 0 deletions internal/controller/eventbus/block/mock_block.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions internal/controller/eventbus/block/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
package block

import (
"context"
"fmt"
"sort"
"sync"

"github.com/linkall-labs/vanus/internal/controller/eventbus/server"
"github.com/linkall-labs/vanus/internal/primitive/vanus"
"github.com/linkall-labs/vanus/observability/log"
)

// VolumeSelector selector for Block creating. The implementation based on different algorithm, typical
Expand Down Expand Up @@ -83,6 +86,9 @@ func (s *volumeRoundRobinSelector) Select(num int, size int64) []server.Instance
for idx := 0; idx < num; idx++ {
instances = append(instances, m[keys[(s.count+int64(idx))%int64(len(keys))]])
}
log.Info(context.TODO(), "picked instances", map[string]interface{}{
"instances": fmt.Sprintf("%v", instances),
})
s.count++
return instances
}
Expand Down
110 changes: 86 additions & 24 deletions internal/controller/eventbus/eventlog/eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package eventlog
import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -92,6 +94,7 @@ type eventlogManager struct {
cleanInterval time.Duration
checkSegmentExpiredInterval time.Duration
segmentExpiredTime time.Duration
createSegmentMutex sync.Mutex
}

func NewManager(volMgr volume.Manager, replicaNum uint, defaultBlockSize int64) Manager {
Expand Down Expand Up @@ -496,12 +499,6 @@ func (mgr *eventlogManager) dynamicScaleUpEventLog(ctx context.Context) {
})
count++
}
/* log too many
log.Debug(ctx, "scale task completed", map[string]interface{}{
"segment_created": count,
"eventlog_id": el.md.ID.String(),
})
*/
return true
})
}
Expand Down Expand Up @@ -648,7 +645,10 @@ func (mgr *eventlogManager) checkSegmentExpired(ctx context.Context) {
}

func (mgr *eventlogManager) createSegment(ctx context.Context, el *eventlog) (*Segment, error) {
seg, err := mgr.generateSegment(ctx)
mgr.createSegmentMutex.Lock()
defer mgr.createSegmentMutex.Unlock()

seg, err := mgr.generateSegment(ctx, el)
defer func() {
// preparing to cleaning
if err != nil {
Expand All @@ -665,26 +665,49 @@ func (mgr *eventlogManager) createSegment(ctx context.Context, el *eventlog) (*S
}
seg.EventLogID = el.md.ID

for i := range seg.Replicas.Peers {
seg.Replicas.Leader = i
ins := mgr.volMgr.GetVolumeInstanceByID(seg.GetLeaderBlock().VolumeID)
srv := ins.GetServer()
if srv == nil {
return nil, errors.ErrVolumeInstanceNoServer
cur := el.currentAppendableSegment()
if cur == nil {
blk, err := mgr.whichIsLeader(seg.Replicas.Peers)
if err != nil {
return nil, err
}
_, err = srv.GetClient().ActivateSegment(ctx, &segment.ActivateSegmentRequest{
EventLogId: seg.EventLogID.Uint64(),
ReplicaGroupId: seg.Replicas.ID.Uint64(),
Replicas: mgr.getSegmentTopology(ctx, seg),
})
if err == nil {
break
seg.Replicas.Leader = blk.ID.Uint64()
} else {
set := false
for _, blk := range seg.Replicas.Peers {
if blk.VolumeID.Equals(cur.GetLeaderBlock().VolumeID) {
seg.Replicas.Leader = blk.ID.Uint64()
set = true
break
}
}
if !set {
log.Error(ctx, "failed to set leader block", map[string]interface{}{
"eventlog": el.md.ID.Key(),
"eventbus": el.md.EventbusName,
"previous_segment_volume_id": cur.GetLeaderBlock().VolumeID,
"replicas": fmt.Sprintf("%v", seg.Replicas),
"segment": seg.ID.Key(),
})
return nil, errors.ErrInvalidSegment
}
}

ins := mgr.volMgr.GetVolumeInstanceByID(seg.GetLeaderBlock().VolumeID)
srv := ins.GetServer()
if srv == nil {
return nil, errors.ErrVolumeInstanceNoServer
}
_, err = srv.GetClient().ActivateSegment(ctx, &segment.ActivateSegmentRequest{
EventLogId: seg.EventLogID.Uint64(),
ReplicaGroupId: seg.Replicas.ID.Uint64(),
Replicas: mgr.getSegmentTopology(ctx, seg),
})

if err != nil {
log.Warning(context.TODO(), "activate segment failed", map[string]interface{}{
log.KeyError: err,
})
}
if err != nil {
return nil, err
}

Expand Down Expand Up @@ -720,9 +743,22 @@ func (mgr *eventlogManager) createSegment(ctx context.Context, el *eventlog) (*S
return seg, nil
}

func (mgr *eventlogManager) generateSegment(ctx context.Context) (*Segment, error) {
func (mgr *eventlogManager) generateSegment(ctx context.Context, el *eventlog) (*Segment, error) {
var seg *Segment
blocks, err := mgr.allocator.Pick(ctx, int(mgr.segmentReplicaNum))
cur := el.currentAppendableSegment()
var blocks []*metadata.Block
var err error
if cur == nil {
blocks, err = mgr.allocator.Pick(ctx, int(mgr.segmentReplicaNum))
} else {
// make sure segments of one eventlog located in one SegmentServer
volumes := make([]vanus.ID, 0)
for _, peer := range cur.Replicas.Peers {
volumes = append(volumes, peer.VolumeID)
}
blocks, err = mgr.allocator.PickByVolumes(ctx, volumes)
}

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -775,6 +811,32 @@ func (mgr *eventlogManager) generateSegment(ctx context.Context) (*Segment, erro
return seg, nil
}

func (mgr *eventlogManager) whichIsLeader(raftGroup map[uint64]*metadata.Block) (*metadata.Block, error) {
countMap := map[uint64]int{}
mgr.globalSegmentMap.Range(func(key, value any) bool {
seg, _ := value.(*Segment)
if seg.State == StateWorking {
vID := seg.GetLeaderBlock().VolumeID.Uint64()
count := countMap[vID]
countMap[vID] = count + 1
}
return true
})

volumeMap := make(map[uint64]*metadata.Block, len(raftGroup))
orderArr := make([]uint64, 0)
for _, node := range raftGroup {
volumeMap[node.VolumeID.Uint64()] = node
orderArr = append(orderArr, node.VolumeID.Uint64())
}
sort.Slice(orderArr, func(i, j int) bool {
return countMap[orderArr[i]] < countMap[orderArr[j]] ||
orderArr[i] < orderArr[j]
})

return volumeMap[orderArr[0]], nil
}

type eventlog struct {
// uint64, *Segment
segmentList *skiplist.SkipList
Expand Down
63 changes: 62 additions & 1 deletion internal/controller/eventbus/eventlog/eventlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func TestEventlogManager_ScaleSegmentTask(t *testing.T) {
ID: vanus.NewTestID(),
Capacity: 64 * 1024 * 1024 * 1024,
}
vanus.InitFakeSnowflake()
alloc.EXPECT().Run(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(nil)
alloc.EXPECT().Pick(gomock.Any(), 3).AnyTimes().DoAndReturn(func(ctx stdCtx.Context, num int) ([]*metadata.Block, error) {
return []*metadata.Block{
Expand All @@ -207,6 +208,26 @@ func TestEventlogManager_ScaleSegmentTask(t *testing.T) {
}, nil
})

alloc.EXPECT().PickByVolumes(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func(ctx stdCtx.Context, volumes []vanus.ID) ([]*metadata.Block, error) {
return []*metadata.Block{
{
ID: vanus.NewTestID(),
Capacity: 64 * 1024 * 1024,
VolumeID: vol1.ID,
},
{
ID: vanus.NewTestID(),
Capacity: 64 * 1024 * 1024,
VolumeID: vol1.ID,
},
{
ID: vanus.NewTestID(),
Capacity: 64 * 1024 * 1024,
VolumeID: vol1.ID,
},
}, nil
})

volIns := server.NewMockInstance(ctrl)
volMgr.EXPECT().GetVolumeInstanceByID(vol1.ID).AnyTimes().Return(volIns)
srv := server.NewMockServer(ctrl)
Expand Down Expand Up @@ -285,6 +306,7 @@ func TestEventlogManager_CleanSegmentTask(t *testing.T) {
utMgr.volMgr = volMgr
kvCli := kv.NewMockClient(ctrl)
utMgr.kvClient = kvCli
vanus.InitFakeSnowflake()

ctx := stdCtx.Background()
kvCli.EXPECT().Set(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
Expand Down Expand Up @@ -314,6 +336,25 @@ func TestEventlogManager_CleanSegmentTask(t *testing.T) {
},
}, nil
})
alloc.EXPECT().PickByVolumes(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func(ctx stdCtx.Context, volumes []vanus.ID) ([]*metadata.Block, error) {
return []*metadata.Block{
{
ID: vanus.NewTestID(),
Capacity: 64 * 1024 * 1024,
VolumeID: vol1.ID,
},
{
ID: vanus.NewTestID(),
Capacity: 64 * 1024 * 1024,
VolumeID: vol1.ID,
},
{
ID: vanus.NewTestID(),
Capacity: 64 * 1024 * 1024,
VolumeID: vol1.ID,
},
}, nil
})

volIns := server.NewMockInstance(ctrl)
volMgr.EXPECT().GetVolumeInstanceByID(vol1.ID).AnyTimes().Return(volIns)
Expand Down Expand Up @@ -380,6 +421,7 @@ func TestEventlogManager_CreateAndGetEventlog(t *testing.T) {
kvCli := kv.NewMockClient(ctrl)
utMgr.kvClient = kvCli

vanus.InitFakeSnowflake()
ctx := stdCtx.Background()
kvCli.EXPECT().Set(gomock.Any(), gomock.Any(), gomock.Any()).Times(14).Return(nil)
alloc := block.NewMockAllocator(ctrl)
Expand All @@ -388,7 +430,26 @@ func TestEventlogManager_CreateAndGetEventlog(t *testing.T) {
ID: vanus.NewTestID(),
Capacity: 64 * 1024 * 1024 * 1024,
}
alloc.EXPECT().Pick(ctx, 3).Times(2).DoAndReturn(func(ctx stdCtx.Context, num int) ([]*metadata.Block, error) {
alloc.EXPECT().Pick(ctx, 3).Times(1).DoAndReturn(func(ctx stdCtx.Context, num int) ([]*metadata.Block, error) {
return []*metadata.Block{
{
ID: vanus.NewTestID(),
Capacity: 64 * 1024 * 1024,
VolumeID: vol1.ID,
},
{
ID: vanus.NewTestID(),
Capacity: 64 * 1024 * 1024,
VolumeID: vol1.ID,
},
{
ID: vanus.NewTestID(),
Capacity: 64 * 1024 * 1024,
VolumeID: vol1.ID,
},
}, nil
})
alloc.EXPECT().PickByVolumes(gomock.Any(), gomock.Any()).Times(1).DoAndReturn(func(ctx stdCtx.Context, volumes []vanus.ID) ([]*metadata.Block, error) {
return []*metadata.Block{
{
ID: vanus.NewTestID(),
Expand Down
Loading

0 comments on commit 5a2e6f2

Please sign in to comment.