Skip to content

Commit

Permalink
mcs: update node every restart (#8302) (#8468)
Browse files Browse the repository at this point in the history
close #8154

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: Ryan Leung <[email protected]>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Aug 1, 2024
1 parent 9fe738a commit ae905fa
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 32 deletions.
62 changes: 32 additions & 30 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand Down Expand Up @@ -181,10 +182,6 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
return
case <-ticker.C:
}
countOfNodes := m.GetNodesCount()
if countOfNodes < utils.DefaultKeyspaceGroupReplicaCount {
continue
}
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0)
if err != nil {
log.Error("failed to load all keyspace groups", zap.Error(err))
Expand All @@ -194,23 +191,26 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
if len(groups) == 0 {
continue
}
withError := false
for _, group := range groups {
if len(group.Members) < utils.DefaultKeyspaceGroupReplicaCount {
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.DefaultKeyspaceGroupReplicaCount)
existMembers := make(map[string]struct{})
for _, member := range group.Members {
if exist, addr := m.IsExistNode(member.Address); exist {
existMembers[addr] = struct{}{}
}
}
numExistMembers := len(existMembers)
if numExistMembers != 0 && numExistMembers == len(group.Members) && numExistMembers == m.GetNodesCount() {
continue
}
if numExistMembers < utils.DefaultKeyspaceGroupReplicaCount {
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, existMembers, utils.DefaultKeyspaceGroupReplicaCount)
if err != nil {
withError = true
log.Error("failed to alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", group.ID), zap.Error(err))
continue
}
group.Members = nodes
}
}
if !withError {
// all keyspace groups have equal or more than default replica count
log.Info("all keyspace groups have equal or more than default replica count, stop to alloc node")
return
}
}
}

Expand Down Expand Up @@ -745,7 +745,7 @@ func (m *GroupManager) GetNodesCount() int {
}

// AllocNodesForKeyspaceGroup allocates nodes for the keyspace group.
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount int) ([]endpoint.KeyspaceGroupMember, error) {
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, existMembers map[string]struct{}, desiredReplicaCount int) ([]endpoint.KeyspaceGroupMember, error) {
m.Lock()
defer m.Unlock()
ctx, cancel := context.WithTimeout(m.ctx, allocNodesTimeout)
Expand All @@ -770,32 +770,34 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging(id)
}
exists := make(map[string]struct{})
for _, member := range kg.Members {
exists[member.Address] = struct{}{}
nodes = append(nodes, member)
}
if len(exists) >= desiredReplicaCount {
return nil

for addr := range existMembers {
nodes = append(nodes, endpoint.KeyspaceGroupMember{
Address: addr,
Priority: utils.DefaultKeyspaceGroupReplicaPriority,
})
}
for len(exists) < desiredReplicaCount {

for len(existMembers) < desiredReplicaCount {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
}
countOfNodes := m.GetNodesCount()
if countOfNodes < desiredReplicaCount || countOfNodes == 0 { // double check
if m.GetNodesCount() == 0 { // double check
return ErrNoAvailableNode
}
if len(existMembers) == m.GetNodesCount() {
break
}
addr := m.nodesBalancer.Next()
if addr == "" {
return ErrNoAvailableNode
}
if _, ok := exists[addr]; ok {
if _, ok := existMembers[addr]; ok {
continue
}
exists[addr] = struct{}{}
existMembers[addr] = struct{}{}
nodes = append(nodes, endpoint.KeyspaceGroupMember{
Address: addr,
Priority: utils.DefaultKeyspaceGroupReplicaPriority,
Expand Down Expand Up @@ -894,14 +896,14 @@ func (m *GroupManager) SetPriorityForKeyspaceGroup(id uint32, node string, prior
}

// IsExistNode checks if the node exists.
func (m *GroupManager) IsExistNode(addr string) bool {
func (m *GroupManager) IsExistNode(addr string) (bool, string) {
nodes := m.nodesBalancer.GetAll()
for _, node := range nodes {
if node == addr {
return true
if typeutil.EqualBaseURLs(node, addr) {
return true, node
}
}
return false
return false, ""
}

// MergeKeyspaceGroups merges the keyspace group in the list into the target keyspace group.
Expand Down
11 changes: 11 additions & 0 deletions pkg/utils/typeutil/comparison.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package typeutil
import (
"math"
"sort"
"strings"
"time"
)

Expand Down Expand Up @@ -78,3 +79,13 @@ func AreStringSlicesEquivalent(a, b []string) bool {
func Float64Equal(a, b float64) bool {
return math.Abs(a-b) <= 1e-6
}

// EqualBaseURLs compares two URLs without scheme.
func EqualBaseURLs(url1, url2 string) bool {
return TrimScheme(url1) == TrimScheme(url2)
}

// TrimScheme trims the scheme from the URL.
func TrimScheme(s string) string {
return strings.TrimPrefix(strings.TrimPrefix(s, "https://"), "http://")
}
8 changes: 8 additions & 0 deletions pkg/utils/typeutil/comparison_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,11 @@ func TestAreStringSlicesEquivalent(t *testing.T) {
re.False(AreStringSlicesEquivalent([]string{}, []string{"a", "b"}))
re.False(AreStringSlicesEquivalent([]string{"a", "b"}, []string{}))
}

func TestCompareURLsWithoutScheme(t *testing.T) {
re := require.New(t)
re.True(EqualBaseURLs("", ""))
re.True(EqualBaseURLs("http://127.0.0.1", "http://127.0.0.1"))
re.True(EqualBaseURLs("http://127.0.0.1", "https://127.0.0.1"))
re.True(EqualBaseURLs("127.0.0.1", "http://127.0.0.1"))
}
12 changes: 10 additions & 2 deletions server/apiv2/handlers/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,16 @@ func AllocNodesForKeyspaceGroup(c *gin.Context) {
c.AbortWithStatusJSON(http.StatusBadRequest, "existed replica is larger than the new replica")
return
}

// check if nodes exist
existMembers := make(map[string]struct{})
for _, member := range keyspaceGroup.Members {
if exist, addr := manager.IsExistNode(member.Address); exist {
existMembers[addr] = struct{}{}
}
}
// get the nodes
nodes, err := manager.AllocNodesForKeyspaceGroup(id, allocParams.Replica)
nodes, err := manager.AllocNodesForKeyspaceGroup(id, existMembers, allocParams.Replica)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
Expand Down Expand Up @@ -460,7 +468,7 @@ func SetNodesForKeyspaceGroup(c *gin.Context) {
}
// check if node exists
for _, node := range setParams.Nodes {
if !manager.IsExistNode(node) {
if exist, _ := manager.IsExistNode(node); !exist {
c.AbortWithStatusJSON(http.StatusBadRequest, "node does not exist")
return
}
Expand Down
96 changes: 96 additions & 0 deletions tests/integrations/mcs/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,102 @@ func (suite *keyspaceGroupTestSuite) TestDefaultKeyspaceGroup() {
}
}

func (suite *keyspaceGroupTestSuite) TestAllocNodes() {
re := suite.Require()
// add three nodes.
nodes := make(map[string]bs.Server)
var cleanups []func()
defer func() {
for _, cleanup := range cleanups {
cleanup()
}
}()
for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount+1; i++ {
s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
cleanups = append(cleanups, cleanup)
nodes[s.GetAddr()] = s
}
tests.WaitForPrimaryServing(re, nodes)

// create a keyspace group.
kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: uint32(1),
UserKind: endpoint.Standard.String(),
},
}}
code := suite.tryCreateKeyspaceGroup(re, kgs)
re.Equal(http.StatusOK, code)

// alloc nodes for the keyspace group
var kg *endpoint.KeyspaceGroup
testutil.Eventually(re, func() bool {
kg, code = suite.tryGetKeyspaceGroup(re, utils.DefaultKeyspaceGroupID)
return code == http.StatusOK && kg != nil && len(kg.Members) == utils.DefaultKeyspaceGroupReplicaCount
})
stopNode := kg.Members[0].Address
// close one of members
nodes[stopNode].Close()

// the member list will be updated
testutil.Eventually(re, func() bool {
kg, code = suite.tryGetKeyspaceGroup(re, utils.DefaultKeyspaceGroupID)
for _, member := range kg.Members {
if member.Address == stopNode {
return false
}
}
return code == http.StatusOK && kg != nil && len(kg.Members) == utils.DefaultKeyspaceGroupReplicaCount
})
}

func (suite *keyspaceGroupTestSuite) TestAllocOneNode() {
re := suite.Require()
// add one tso server
nodes := make(map[string]bs.Server)
oldTSOServer, cleanupOldTSOserver := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
defer cleanupOldTSOserver()
nodes[oldTSOServer.GetAddr()] = oldTSOServer

tests.WaitForPrimaryServing(re, nodes)

// create a keyspace group.
kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: uint32(1),
UserKind: endpoint.Standard.String(),
},
}}
code := suite.tryCreateKeyspaceGroup(re, kgs)
re.Equal(http.StatusOK, code)

// alloc nodes for the keyspace group
var kg *endpoint.KeyspaceGroup
testutil.Eventually(re, func() bool {
kg, code = suite.tryGetKeyspaceGroup(re, utils.DefaultKeyspaceGroupID)
return code == http.StatusOK && kg != nil && len(kg.Members) == 1
})
stopNode := kg.Members[0].Address
// close old tso server
nodes[stopNode].Close()

// create a new tso server
newTSOServer, cleanupNewTSOServer := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
defer cleanupNewTSOServer()
nodes[newTSOServer.GetAddr()] = newTSOServer

tests.WaitForPrimaryServing(re, nodes)

// the member list will be updated
testutil.Eventually(re, func() bool {
kg, code = suite.tryGetKeyspaceGroup(re, utils.DefaultKeyspaceGroupID)
if len(kg.Members) != 0 && kg.Members[0].Address == stopNode {
return false
}
return code == http.StatusOK && kg != nil && len(kg.Members) == 1
})
}

func (suite *keyspaceGroupTestSuite) tryAllocNodesForKeyspaceGroup(re *require.Assertions, id int, request *handlers.AllocNodesForKeyspaceGroupParams) ([]endpoint.KeyspaceGroupMember, int) {
data, err := json.Marshal(request)
re.NoError(err)
Expand Down

0 comments on commit ae905fa

Please sign in to comment.