Skip to content

Commit

Permalink
Merge pull request #612 from ploubser/issue_596
Browse files Browse the repository at this point in the history
(#596) Update balancer to use Placement preferred
  • Loading branch information
ripienaar authored Jan 16, 2025
2 parents e2f9c88 + df65ad5 commit 4933286
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 90 deletions.
156 changes: 69 additions & 87 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import (
// streams or consumers, and then determine an even distribution. If any of the
// servers is the leader for more than the even distribution, the balancer will
// step down a number of streams/consumers until the even distribution is met.
// Which streams/consumers are stepped down is determined randomly.
// If stepping down fails, or if the same server is elected the leader again,
// we will move on the next randomly selected server. If we get a second, similar
// failure the Balancer will return an error.
// Which streams/consumers are stepped down is determined randomly. We use
// preferred placement to move the leadership to a server with less than
// the even distribution. If stepping down fails, we will move on the next
// randomly selected server. If we get a second, similar failure the Balancer
// will return an error.
type Balancer struct {
nc *nats.Conn
log api.Logger
Expand All @@ -45,54 +46,43 @@ type balanceEntity interface {
}

type peer struct {
hostname string
entities []balanceEntity
leaderCount int
rebalance int
name string
entities []balanceEntity
offset int
}

// New returns a new instance of the Balancer
func New(nc *nats.Conn, log api.Logger) (*Balancer, error) {
return &Balancer{
nc: nc,
log: log,
}, nil
}
mgr, err := jsm.New(nc)
if err != nil {
return nil, err
}

func (b *Balancer) updateServersWithExclude(servers map[string]*peer, exclude string) (map[string]*peer, error) {
updated := map[string]*peer{}
var err error
apiLvl, err := mgr.MetaApiLevel(true)
if err != nil {
return nil, err
}

for _, s := range servers {
if s.hostname == exclude {
continue
}
for _, e := range s.entities {
updated, err = b.mapEntityToServers(e, updated)
if err != nil {
return updated, err
}
}
if apiLvl < 1 {
return nil, fmt.Errorf("invalid server version. Balancer requires server version 2.11.0 or higher")
}

return updated, nil
return &Balancer{
nc: nc,
log: log,
}, nil
}

func (b *Balancer) getOvers(server map[string]*peer, evenDistribution int) {
for _, s := range server {
if s.leaderCount == 0 {
continue
}

if over := s.leaderCount - evenDistribution; over > 0 {
s.rebalance = over
}
func (b *Balancer) calcOffset(server *map[string]*peer, evenDistribution int) {
for _, s := range *server {
s.offset = len(s.entities) - evenDistribution
}
}

// We consider a balanced stream to be one with an offset of 0 or less
func (b *Balancer) isBalanced(servers map[string]*peer) bool {
for _, s := range servers {
if s.rebalance > 0 {
if s.offset > 0 {
return false
}
}
Expand All @@ -110,22 +100,19 @@ func (b *Balancer) mapEntityToServers(entity balanceEntity, serverMap map[string
_, ok := serverMap[leaderName]
if !ok {
tmp := peer{
hostname: leaderName,
entities: []balanceEntity{},
leaderCount: 0,
name: leaderName,
entities: []balanceEntity{},
}
serverMap[leaderName] = &tmp
}
serverMap[leaderName].entities = append(serverMap[leaderName].entities, entity)
serverMap[leaderName].leaderCount += 1

for _, replica := range info.Replicas {
_, ok = serverMap[replica.Name]
if !ok {
tmp := peer{
hostname: replica.Name,
entities: []balanceEntity{},
leaderCount: 0,
name: replica.Name,
entities: []balanceEntity{},
}
serverMap[replica.Name] = &tmp
}
Expand All @@ -139,58 +126,51 @@ func (b *Balancer) calcDistribution(entities, servers int) int {
return int(math.Floor(evenDistributionf + 0.5))
}

func (b *Balancer) balance(servers map[string]*peer, evenDistribution int) (int, error) {
var err error
func (b *Balancer) balance(servers map[string]*peer, evenDistribution int, typeHint string) (int, error) {
//var err error
steppedDown := 0

for !b.isBalanced(servers) {
for _, s := range servers {
// skip servers that aren't leaders
if s.leaderCount == 0 {
continue
}

if s.rebalance > 0 {
b.log.Infof("Found server '%s' with %d entities over the even distribution\n", s.hostname, s.rebalance)
// Now we have to kick a random selection of streams where number = rebalance
if s.offset > 0 {
b.log.Infof("Found server '%s' with offset of %d. Rebalancing", s.name, s.offset)
retries := 0
for i := 0; i < s.rebalance; i++ {
for i := 0; i <= s.offset; i++ {
// find a random stream (or consumer) to move to another server
randomIndex := rand.Intn(len(s.entities))
entity := s.entities[randomIndex]
if entity == nil {
return steppedDown, fmt.Errorf("no more valid entities to balance")
}
b.log.Infof("Requesting leader (%s) step down for %s", s.hostname, entity.Name())

err = entity.LeaderStepDown()
if err != nil {
b.log.Errorf("Unable to step down leader for %s - %s", entity.Name(), err)
// If we failed to step down the stream, decrement the iterator so that we don't kick one too few
// Limit this to one retry, if we can't step down multiple leaders something is wrong
if retries == 0 {
i--
retries++

for _, ns := range servers {
if ns.offset < 0 {
b.log.Infof("Requesting leader '%s' step down for %s '%s'. New preferred leader is %s.", s.name, typeHint, entity.Name(), ns.name)
placement := api.Placement{Preferred: ns.name}
err := entity.LeaderStepDown(&placement)
if err != nil {
b.log.Errorf("Unable to step down leader for %s - %s", entity.Name(), err)
// If we failed to step down the stream, decrement the iterator so that we don't kick one too few
// Limit this to one retry, if we can't step down multiple leaders something is wrong
if retries == 0 {
i--
retries++
s.entities = slices.Delete(s.entities, randomIndex, randomIndex+1)
break
}
return 0, err
}
b.log.Infof("Successful step down for %s '%s'", typeHint, entity.Name())
retries = 0
steppedDown += 1
ns.offset += 1
s.offset -= 1
s.entities = slices.Delete(s.entities, randomIndex, randomIndex+1)
continue
break
}
return 0, err
}

b.log.Infof("Successful step down '%s'", entity.Name())
retries = 0
s.entities = slices.Delete(s.entities, randomIndex, randomIndex+1)
steppedDown += 1
}

// finally, if we rebalanced a server we update the servers list and start again, excluding the one we just rebalanced
servers, err = b.updateServersWithExclude(servers, s.hostname)
if err != nil {
return steppedDown, err
}
b.getOvers(servers, evenDistribution)
break
}
}
// We recalculate the offset count, we can't be 100% sure entities moved to their preferred server
b.calcOffset(&servers, evenDistribution)
}

return steppedDown, nil
Expand All @@ -212,9 +192,10 @@ func (b *Balancer) BalanceStreams(streams []*jsm.Stream) (int, error) {
b.log.Debugf("found %d streams on %d servers\n", len(streams), len(servers))
evenDistribution := b.calcDistribution(len(streams), len(servers))
b.log.Debugf("even distribution is %d\n", evenDistribution)
b.getOvers(servers, evenDistribution)
b.log.Debugf("calculating offset for each server")
b.calcOffset(&servers, evenDistribution)

return b.balance(servers, evenDistribution)
return b.balance(servers, evenDistribution, "stream")
}

// BalanceConsumers finds the expected distribution of consumer leaders over servers
Expand All @@ -233,7 +214,8 @@ func (b *Balancer) BalanceConsumers(consumers []*jsm.Consumer) (int, error) {
b.log.Debugf("found %d consumers on %d servers\n", len(consumers), len(servers))
evenDistribution := b.calcDistribution(len(consumers), len(servers))
b.log.Debugf("even distribution is %d\n", evenDistribution)
b.getOvers(servers, evenDistribution)
b.log.Debugf("calculating offset for each server")
b.calcOffset(&servers, evenDistribution)

return b.balance(servers, evenDistribution)
return b.balance(servers, evenDistribution, "consumer")
}
6 changes: 3 additions & 3 deletions balancer/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func TestBalanceStream(t *testing.T) {
withJSCluster(t, 3, func(t *testing.T, servers []*server.Server, nc *nats.Conn, mgr *jsm.Manager) error {
streams := []*jsm.Stream{}
for i := 1; i <= 10; i++ {
for i := 1; i < 10; i++ {
streamName := fmt.Sprintf("tests%d", i)
subjects := fmt.Sprintf("tests%d.*", i)
s, err := mgr.NewStream(streamName, jsm.Subjects(subjects), jsm.MemoryStorage(), jsm.Replicas(3))
Expand Down Expand Up @@ -62,7 +62,7 @@ func TestBalanceConsumer(t *testing.T) {
defer s.Delete()

consumers := []*jsm.Consumer{}
for i := 1; i <= 10; i++ {
for i := 1; i < 10; i++ {
consumerName := fmt.Sprintf("testc%d", i)
c, err := mgr.NewConsumer("TEST_CONSUMER_BALANCE", jsm.ConsumerName(consumerName))
if err != nil {
Expand Down Expand Up @@ -154,7 +154,7 @@ func withJSCluster(t *testing.T, retries int, cb func(*testing.T, []*server.Serv
}
defer nc.Close()

mgr, err := jsm.New(nc, jsm.WithTimeout(time.Second))
mgr, err := jsm.New(nc, jsm.WithTimeout(5*time.Second))
if err != nil {
t.Fatalf("manager creation failed: %s", err)
}
Expand Down

0 comments on commit 4933286

Please sign in to comment.