Skip to content

Commit

Permalink
feat(coordinator): use internal address as unique identification for …
Browse files Browse the repository at this point in the history
…load balance (#565)

### Motivation

Currently, we are using the server address as the key to do rebalance.
That will be triggered if the user only changes the public address of
the server.
We can use the internal address as the unique identification for the
user to allow the user to change the public address or other information
without the node swapping.


### Modification

- Introduce `ServerContext` struct as the map value.
- Use the internal address as the filter key.
  • Loading branch information
mattisonchao authored Nov 10, 2024
1 parent 7283e47 commit 61f09cf
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 48 deletions.
80 changes: 41 additions & 39 deletions coordinator/impl/cluster_rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ type SwapNodeAction struct {
To model.ServerAddress
}

type ServerContext struct {
Addr model.ServerAddress
Shards common.Set[int64]
}

// Make sure every server is assigned a similar number of shards
// Output a list of actions to be taken to rebalance the cluster.
func rebalanceCluster(servers []model.ServerAddress, currentStatus *model.ClusterStatus) []SwapNodeAction { //nolint:revive
Expand All @@ -49,11 +54,11 @@ outer:
}
if len(deletedServers) > 0 {
slog.Debug("Deleted servers: ")
for ds, shards := range deletedServers {
for id, context := range deletedServers {
slog.Debug(
"",
slog.String("server", ds.Internal),
slog.Int("count", shards.Count()),
slog.String("server", id),
slog.Int("count", context.Shards.Count()),
)
}
}
Expand All @@ -62,26 +67,26 @@ outer:
// First try to reassign shards from the removed servers.
// We do it one by one, by placing in the lead loaded server
if len(deletedServers) > 0 {
ds, shards := getFirstEntry(deletedServers)
id, context := getFirstEntry(deletedServers)

for j := serversCount - 1; j >= 0; j-- {
to := rankings[j]
eligibleShards := shards.Complement(to.Shards)
eligibleShards := context.Shards.Complement(to.Shards)

if !eligibleShards.IsEmpty() {
a := SwapNodeAction{
Shard: eligibleShards.GetSorted()[0],
From: ds,
From: context.Addr,
To: to.Addr,
}

shards.Remove(a.Shard)
if shards.IsEmpty() {
delete(deletedServers, ds)
context.Shards.Remove(a.Shard)
if context.Shards.IsEmpty() {
delete(deletedServers, id)
} else {
deletedServers[ds] = shards
deletedServers[id] = context
}
shardsPerServer[a.To].Add(a.Shard)
shardsPerServer[a.To.Internal].Shards.Add(a.Shard)

slog.Debug(
"Transfer from removed node",
Expand Down Expand Up @@ -117,8 +122,8 @@ outer:
To: leastLoaded.Addr,
}

shardsPerServer[a.From].Remove(a.Shard)
shardsPerServer[a.To].Add(a.Shard)
shardsPerServer[a.From.Internal].Shards.Remove(a.Shard)
shardsPerServer[a.To.Internal].Shards.Add(a.Shard)

slog.Debug(
"Swapping nodes",
Expand All @@ -132,49 +137,46 @@ outer:
}

func getShardsPerServer(servers []model.ServerAddress, currentStatus *model.ClusterStatus) (
existingServers map[model.ServerAddress]common.Set[int64],
deletedServers map[model.ServerAddress]common.Set[int64]) {
existingServers = map[model.ServerAddress]common.Set[int64]{}
deletedServers = map[model.ServerAddress]common.Set[int64]{}
existingServers map[string]ServerContext,
deletedServers map[string]ServerContext) {
existingServers = map[string]ServerContext{}
deletedServers = map[string]ServerContext{}

for _, s := range servers {
existingServers[s] = common.NewSet[int64]()
existingServers[s.Internal] = ServerContext{
Addr: s,
Shards: common.NewSet[int64](),
}
}

for _, nss := range currentStatus.Namespaces {
for shardId, shard := range nss.Shards {
for _, addr := range shard.Ensemble {
if _, ok := existingServers[addr]; ok {
existingServers[addr].Add(shardId)
for _, candidate := range shard.Ensemble {
if _, ok := existingServers[candidate.Internal]; ok {
existingServers[candidate.Internal].Shards.Add(shardId)
continue
}

// This server is getting removed
if _, ok := deletedServers[addr]; !ok {
deletedServers[addr] = common.NewSet[int64]()
if _, ok := deletedServers[candidate.Internal]; !ok {
deletedServers[candidate.Internal] = ServerContext{
Addr: candidate,
Shards: common.NewSet[int64]()}
}

deletedServers[addr].Add(shardId)
deletedServers[candidate.Internal].Shards.Add(shardId)
}
}
}

return existingServers, deletedServers
}

type ServerRank struct {
Addr model.ServerAddress
Shards common.Set[int64]
}

func getServerRanking(shardsPerServer map[model.ServerAddress]common.Set[int64]) []ServerRank {
res := make([]ServerRank, 0)
func getServerRanking(shardsPerServer map[string]ServerContext) []ServerContext {
res := make([]ServerContext, 0)

for server, shards := range shardsPerServer {
res = append(res, ServerRank{
Addr: server,
Shards: shards,
})
for _, context := range shardsPerServer {
res = append(res, context)
}

// Rank the servers from the one with most shards to the one with the least
Expand All @@ -191,14 +193,14 @@ func getServerRanking(shardsPerServer map[model.ServerAddress]common.Set[int64])
return res
}

func getFirstEntry(m map[model.ServerAddress]common.Set[int64]) (model.ServerAddress, common.Set[int64]) {
keys := make([]model.ServerAddress, 0, len(m))
func getFirstEntry(m map[string]ServerContext) (string, ServerContext) {
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}

sort.SliceStable(keys, func(i, j int) bool {
return keys[i].Internal < keys[j].Internal
return keys[i] < keys[j]
})

return keys[0], m[keys[0]]
Expand Down
29 changes: 22 additions & 7 deletions coordinator/impl/cluster_rebalance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,30 @@ func TestClusterRebalance_Count(t *testing.T) {

count, deletedServers := getShardsPerServer([]model.ServerAddress{s1, s2, s3, s4, s5}, cs)

assert.Equal(t, map[model.ServerAddress]common.Set[int64]{
s1: common.NewSetFrom[int64]([]int64{0, 1, 2}),
s2: common.NewSetFrom[int64]([]int64{0, 1}),
s3: common.NewSetFrom[int64]([]int64{0, 2}),
s4: common.NewSetFrom[int64]([]int64{1, 2}),
s5: common.NewSet[int64](),
assert.Equal(t, map[string]ServerContext{
s1.Internal: {
s1,
common.NewSetFrom[int64]([]int64{0, 1, 2}),
},
s2.Internal: {
s2,
common.NewSetFrom[int64]([]int64{0, 1}),
},
s3.Internal: {
s3,
common.NewSetFrom[int64]([]int64{0, 2}),
},
s4.Internal: {
s4,
common.NewSetFrom[int64]([]int64{1, 2}),
},
s5.Internal: {
s5,
common.NewSet[int64](),
},
}, count)

assert.Equal(t, map[model.ServerAddress]common.Set[int64]{}, deletedServers)
assert.Equal(t, map[string]ServerContext{}, deletedServers)
}

func TestClusterRebalance_Single(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions coordinator/impl/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,12 +452,12 @@ func (c *coordinator) handleClusterConfigUpdated() error {
slog.Any("metadataVersion", c.metadataVersion),
)

c.checkClusterNodeChanges(newClusterConfig)

for _, sc := range c.shardControllers {
sc.SyncServerAddress()
}

c.checkClusterNodeChanges(newClusterConfig)

clusterStatus, shardsToAdd, shardsToDelete := applyClusterChanges(&newClusterConfig, c.clusterStatus)

for shard, namespace := range shardsToAdd {
Expand Down
5 changes: 5 additions & 0 deletions coordinator/impl/coordinator_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,11 @@ func TestCoordinator_RefreshServerInfo(t *testing.T) {
})
}

// check if the new config will trigger node swap
status := c.ClusterStatus()
actions := rebalanceCluster(clusterServer, &status)
assert.EqualValues(t, 0, len(actions))

clusterConfig.Servers = clusterServer
configChangesCh <- nil

Expand Down

0 comments on commit 61f09cf

Please sign in to comment.