Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: update node every restart (#8302) #8468

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 32 additions & 30 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand Down Expand Up @@ -181,10 +182,6 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
return
case <-ticker.C:
}
countOfNodes := m.GetNodesCount()
if countOfNodes < utils.DefaultKeyspaceGroupReplicaCount {
continue
}
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0)
if err != nil {
log.Error("failed to load all keyspace groups", zap.Error(err))
Expand All @@ -194,23 +191,26 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
if len(groups) == 0 {
continue
}
withError := false
for _, group := range groups {
if len(group.Members) < utils.DefaultKeyspaceGroupReplicaCount {
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.DefaultKeyspaceGroupReplicaCount)
existMembers := make(map[string]struct{})
for _, member := range group.Members {
if exist, addr := m.IsExistNode(member.Address); exist {
existMembers[addr] = struct{}{}
}
}
numExistMembers := len(existMembers)
if numExistMembers != 0 && numExistMembers == len(group.Members) && numExistMembers == m.GetNodesCount() {
continue
}
if numExistMembers < utils.DefaultKeyspaceGroupReplicaCount {
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, existMembers, utils.DefaultKeyspaceGroupReplicaCount)
if err != nil {
withError = true
log.Error("failed to alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", group.ID), zap.Error(err))
continue
}
group.Members = nodes
}
}
if !withError {
// all keyspace groups have equal or more than default replica count
log.Info("all keyspace groups have equal or more than default replica count, stop to alloc node")
return
}
}
}

Expand Down Expand Up @@ -745,7 +745,7 @@ func (m *GroupManager) GetNodesCount() int {
}

// AllocNodesForKeyspaceGroup allocates nodes for the keyspace group.
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount int) ([]endpoint.KeyspaceGroupMember, error) {
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, existMembers map[string]struct{}, desiredReplicaCount int) ([]endpoint.KeyspaceGroupMember, error) {
m.Lock()
defer m.Unlock()
ctx, cancel := context.WithTimeout(m.ctx, allocNodesTimeout)
Expand All @@ -770,32 +770,34 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging(id)
}
exists := make(map[string]struct{})
for _, member := range kg.Members {
exists[member.Address] = struct{}{}
nodes = append(nodes, member)
}
if len(exists) >= desiredReplicaCount {
return nil

for addr := range existMembers {
nodes = append(nodes, endpoint.KeyspaceGroupMember{
Address: addr,
Priority: utils.DefaultKeyspaceGroupReplicaPriority,
})
}
for len(exists) < desiredReplicaCount {

for len(existMembers) < desiredReplicaCount {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
}
countOfNodes := m.GetNodesCount()
if countOfNodes < desiredReplicaCount || countOfNodes == 0 { // double check
if m.GetNodesCount() == 0 { // double check
return ErrNoAvailableNode
}
if len(existMembers) == m.GetNodesCount() {
break
}
addr := m.nodesBalancer.Next()
if addr == "" {
return ErrNoAvailableNode
}
if _, ok := exists[addr]; ok {
if _, ok := existMembers[addr]; ok {
continue
}
exists[addr] = struct{}{}
existMembers[addr] = struct{}{}
nodes = append(nodes, endpoint.KeyspaceGroupMember{
Address: addr,
Priority: utils.DefaultKeyspaceGroupReplicaPriority,
Expand Down Expand Up @@ -894,14 +896,14 @@ func (m *GroupManager) SetPriorityForKeyspaceGroup(id uint32, node string, prior
}

// IsExistNode checks if the node exists.
func (m *GroupManager) IsExistNode(addr string) bool {
func (m *GroupManager) IsExistNode(addr string) (bool, string) {
nodes := m.nodesBalancer.GetAll()
for _, node := range nodes {
if node == addr {
return true
if typeutil.EqualBaseURLs(node, addr) {
return true, node
}
}
return false
return false, ""
}

// MergeKeyspaceGroups merges the keyspace group in the list into the target keyspace group.
Expand Down
11 changes: 11 additions & 0 deletions pkg/utils/typeutil/comparison.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package typeutil
import (
"math"
"sort"
"strings"
"time"
)

Expand Down Expand Up @@ -78,3 +79,13 @@ func AreStringSlicesEquivalent(a, b []string) bool {
func Float64Equal(a, b float64) bool {
return math.Abs(a-b) <= 1e-6
}

// EqualBaseURLs compares two URLs without scheme.
func EqualBaseURLs(url1, url2 string) bool {
return TrimScheme(url1) == TrimScheme(url2)
}

// TrimScheme trims the scheme from the URL.
func TrimScheme(s string) string {
return strings.TrimPrefix(strings.TrimPrefix(s, "https://"), "http://")
}
8 changes: 8 additions & 0 deletions pkg/utils/typeutil/comparison_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,11 @@ func TestAreStringSlicesEquivalent(t *testing.T) {
re.False(AreStringSlicesEquivalent([]string{}, []string{"a", "b"}))
re.False(AreStringSlicesEquivalent([]string{"a", "b"}, []string{}))
}

func TestCompareURLsWithoutScheme(t *testing.T) {
re := require.New(t)
re.True(EqualBaseURLs("", ""))
re.True(EqualBaseURLs("http://127.0.0.1", "http://127.0.0.1"))
re.True(EqualBaseURLs("http://127.0.0.1", "https://127.0.0.1"))
re.True(EqualBaseURLs("127.0.0.1", "http://127.0.0.1"))
}
12 changes: 10 additions & 2 deletions server/apiv2/handlers/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,16 @@ func AllocNodesForKeyspaceGroup(c *gin.Context) {
c.AbortWithStatusJSON(http.StatusBadRequest, "existed replica is larger than the new replica")
return
}

// check if nodes exist
existMembers := make(map[string]struct{})
for _, member := range keyspaceGroup.Members {
if exist, addr := manager.IsExistNode(member.Address); exist {
existMembers[addr] = struct{}{}
}
}
// get the nodes
nodes, err := manager.AllocNodesForKeyspaceGroup(id, allocParams.Replica)
nodes, err := manager.AllocNodesForKeyspaceGroup(id, existMembers, allocParams.Replica)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
Expand Down Expand Up @@ -460,7 +468,7 @@ func SetNodesForKeyspaceGroup(c *gin.Context) {
}
// check if node exists
for _, node := range setParams.Nodes {
if !manager.IsExistNode(node) {
if exist, _ := manager.IsExistNode(node); !exist {
c.AbortWithStatusJSON(http.StatusBadRequest, "node does not exist")
return
}
Expand Down
96 changes: 96 additions & 0 deletions tests/integrations/mcs/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,102 @@ func (suite *keyspaceGroupTestSuite) TestDefaultKeyspaceGroup() {
}
}

func (suite *keyspaceGroupTestSuite) TestAllocNodes() {
re := suite.Require()
// add three nodes.
nodes := make(map[string]bs.Server)
var cleanups []func()
defer func() {
for _, cleanup := range cleanups {
cleanup()
}
}()
for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount+1; i++ {
s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
cleanups = append(cleanups, cleanup)
nodes[s.GetAddr()] = s
}
tests.WaitForPrimaryServing(re, nodes)

// create a keyspace group.
kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: uint32(1),
UserKind: endpoint.Standard.String(),
},
}}
code := suite.tryCreateKeyspaceGroup(re, kgs)
re.Equal(http.StatusOK, code)

// alloc nodes for the keyspace group
var kg *endpoint.KeyspaceGroup
testutil.Eventually(re, func() bool {
kg, code = suite.tryGetKeyspaceGroup(re, utils.DefaultKeyspaceGroupID)
return code == http.StatusOK && kg != nil && len(kg.Members) == utils.DefaultKeyspaceGroupReplicaCount
})
stopNode := kg.Members[0].Address
// close one of members
nodes[stopNode].Close()

// the member list will be updated
testutil.Eventually(re, func() bool {
kg, code = suite.tryGetKeyspaceGroup(re, utils.DefaultKeyspaceGroupID)
for _, member := range kg.Members {
if member.Address == stopNode {
return false
}
}
return code == http.StatusOK && kg != nil && len(kg.Members) == utils.DefaultKeyspaceGroupReplicaCount
})
}

func (suite *keyspaceGroupTestSuite) TestAllocOneNode() {
re := suite.Require()
// add one tso server
nodes := make(map[string]bs.Server)
oldTSOServer, cleanupOldTSOserver := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
defer cleanupOldTSOserver()
nodes[oldTSOServer.GetAddr()] = oldTSOServer

tests.WaitForPrimaryServing(re, nodes)

// create a keyspace group.
kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: uint32(1),
UserKind: endpoint.Standard.String(),
},
}}
code := suite.tryCreateKeyspaceGroup(re, kgs)
re.Equal(http.StatusOK, code)

// alloc nodes for the keyspace group
var kg *endpoint.KeyspaceGroup
testutil.Eventually(re, func() bool {
kg, code = suite.tryGetKeyspaceGroup(re, utils.DefaultKeyspaceGroupID)
return code == http.StatusOK && kg != nil && len(kg.Members) == 1
})
stopNode := kg.Members[0].Address
// close old tso server
nodes[stopNode].Close()

// create a new tso server
newTSOServer, cleanupNewTSOServer := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
defer cleanupNewTSOServer()
nodes[newTSOServer.GetAddr()] = newTSOServer

tests.WaitForPrimaryServing(re, nodes)

// the member list will be updated
testutil.Eventually(re, func() bool {
kg, code = suite.tryGetKeyspaceGroup(re, utils.DefaultKeyspaceGroupID)
if len(kg.Members) != 0 && kg.Members[0].Address == stopNode {
return false
}
return code == http.StatusOK && kg != nil && len(kg.Members) == 1
})
}

func (suite *keyspaceGroupTestSuite) tryAllocNodesForKeyspaceGroup(re *require.Assertions, id int, request *handlers.AllocNodesForKeyspaceGroupParams) ([]endpoint.KeyspaceGroupMember, int) {
data, err := json.Marshal(request)
re.NoError(err)
Expand Down
Loading