diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 5ed9747e923..372356d716c 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -36,6 +36,7 @@ import ( "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" + "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -181,10 +182,6 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) { return case <-ticker.C: } - countOfNodes := m.GetNodesCount() - if countOfNodes < utils.DefaultKeyspaceGroupReplicaCount { - continue - } groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0) if err != nil { log.Error("failed to load all keyspace groups", zap.Error(err)) @@ -194,23 +191,26 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) { if len(groups) == 0 { continue } - withError := false for _, group := range groups { - if len(group.Members) < utils.DefaultKeyspaceGroupReplicaCount { - nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.DefaultKeyspaceGroupReplicaCount) + existMembers := make(map[string]struct{}) + for _, member := range group.Members { + if exist, addr := m.IsExistNode(member.Address); exist { + existMembers[addr] = struct{}{} + } + } + numExistMembers := len(existMembers) + if numExistMembers != 0 && numExistMembers == len(group.Members) && numExistMembers == m.GetNodesCount() { + continue + } + if numExistMembers < utils.DefaultKeyspaceGroupReplicaCount { + nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, existMembers, utils.DefaultKeyspaceGroupReplicaCount) if err != nil { - withError = true log.Error("failed to alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", group.ID), zap.Error(err)) continue } group.Members = nodes } } - if !withError { - // all keyspace groups have equal or more than default replica count - log.Info("all keyspace groups have equal or more than default replica count, stop to alloc node") - return - } } } @@ -745,7 +745,7 @@ func (m *GroupManager) GetNodesCount() int { } // AllocNodesForKeyspaceGroup allocates nodes for the keyspace group. -func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount int) ([]endpoint.KeyspaceGroupMember, error) { +func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, existMembers map[string]struct{}, desiredReplicaCount int) ([]endpoint.KeyspaceGroupMember, error) { m.Lock() defer m.Unlock() ctx, cancel := context.WithTimeout(m.ctx, allocNodesTimeout) @@ -770,32 +770,34 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount if kg.IsMerging() { return ErrKeyspaceGroupInMerging(id) } - exists := make(map[string]struct{}) - for _, member := range kg.Members { - exists[member.Address] = struct{}{} - nodes = append(nodes, member) - } - if len(exists) >= desiredReplicaCount { - return nil + + for addr := range existMembers { + nodes = append(nodes, endpoint.KeyspaceGroupMember{ + Address: addr, + Priority: utils.DefaultKeyspaceGroupReplicaPriority, + }) } - for len(exists) < desiredReplicaCount { + + for len(existMembers) < desiredReplicaCount { select { case <-ctx.Done(): return nil case <-ticker.C: } - countOfNodes := m.GetNodesCount() - if countOfNodes < desiredReplicaCount || countOfNodes == 0 { // double check + if m.GetNodesCount() == 0 { // double check return ErrNoAvailableNode } + if len(existMembers) == m.GetNodesCount() { + break + } addr := m.nodesBalancer.Next() if addr == "" { return ErrNoAvailableNode } - if _, ok := exists[addr]; ok { + if _, ok := existMembers[addr]; ok { continue } - exists[addr] = struct{}{} + existMembers[addr] = struct{}{} nodes = append(nodes, endpoint.KeyspaceGroupMember{ Address: addr, Priority: utils.DefaultKeyspaceGroupReplicaPriority, @@ -894,14 +896,14 @@ func (m *GroupManager) SetPriorityForKeyspaceGroup(id uint32, node string, prior } // IsExistNode checks if the node exists. -func (m *GroupManager) IsExistNode(addr string) bool { +func (m *GroupManager) IsExistNode(addr string) (bool, string) { nodes := m.nodesBalancer.GetAll() for _, node := range nodes { - if node == addr { - return true + if typeutil.EqualBaseURLs(node, addr) { + return true, node } } - return false + return false, "" } // MergeKeyspaceGroups merges the keyspace group in the list into the target keyspace group. diff --git a/pkg/utils/typeutil/comparison.go b/pkg/utils/typeutil/comparison.go index c976ac47102..f4fb602a2f7 100644 --- a/pkg/utils/typeutil/comparison.go +++ b/pkg/utils/typeutil/comparison.go @@ -17,6 +17,7 @@ package typeutil import ( "math" "sort" + "strings" "time" ) @@ -78,3 +79,13 @@ func AreStringSlicesEquivalent(a, b []string) bool { func Float64Equal(a, b float64) bool { return math.Abs(a-b) <= 1e-6 } + +// EqualBaseURLs compares two URLs without scheme. +func EqualBaseURLs(url1, url2 string) bool { + return TrimScheme(url1) == TrimScheme(url2) +} + +// TrimScheme trims the scheme from the URL. +func TrimScheme(s string) string { + return strings.TrimPrefix(strings.TrimPrefix(s, "https://"), "http://") +} diff --git a/pkg/utils/typeutil/comparison_test.go b/pkg/utils/typeutil/comparison_test.go index b296405b3d5..05f0e5c0baf 100644 --- a/pkg/utils/typeutil/comparison_test.go +++ b/pkg/utils/typeutil/comparison_test.go @@ -71,3 +71,11 @@ func TestAreStringSlicesEquivalent(t *testing.T) { re.False(AreStringSlicesEquivalent([]string{}, []string{"a", "b"})) re.False(AreStringSlicesEquivalent([]string{"a", "b"}, []string{})) } + +func TestCompareURLsWithoutScheme(t *testing.T) { + re := require.New(t) + re.True(EqualBaseURLs("", "")) + re.True(EqualBaseURLs("http://127.0.0.1", "http://127.0.0.1")) + re.True(EqualBaseURLs("http://127.0.0.1", "https://127.0.0.1")) + re.True(EqualBaseURLs("127.0.0.1", "http://127.0.0.1")) +} diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index a9f042687f6..835b4a5242b 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -413,8 +413,16 @@ func AllocNodesForKeyspaceGroup(c *gin.Context) { c.AbortWithStatusJSON(http.StatusBadRequest, "existed replica is larger than the new replica") return } + + // check if nodes exist + existMembers := make(map[string]struct{}) + for _, member := range keyspaceGroup.Members { + if exist, addr := manager.IsExistNode(member.Address); exist { + existMembers[addr] = struct{}{} + } + } // get the nodes - nodes, err := manager.AllocNodesForKeyspaceGroup(id, allocParams.Replica) + nodes, err := manager.AllocNodesForKeyspaceGroup(id, existMembers, allocParams.Replica) if err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) return @@ -460,7 +468,7 @@ func SetNodesForKeyspaceGroup(c *gin.Context) { } // check if node exists for _, node := range setParams.Nodes { - if !manager.IsExistNode(node) { + if exist, _ := manager.IsExistNode(node); !exist { c.AbortWithStatusJSON(http.StatusBadRequest, "node does not exist") return } diff --git a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go index ccec0a7cdc0..33e775211d2 100644 --- a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go +++ b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go @@ -318,6 +318,102 @@ func (suite *keyspaceGroupTestSuite) TestDefaultKeyspaceGroup() { } } +func (suite *keyspaceGroupTestSuite) TestAllocNodes() { + re := suite.Require() + // add three nodes. + nodes := make(map[string]bs.Server) + var cleanups []func() + defer func() { + for _, cleanup := range cleanups { + cleanup() + } + }() + for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount+1; i++ { + s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) + cleanups = append(cleanups, cleanup) + nodes[s.GetAddr()] = s + } + tests.WaitForPrimaryServing(re, nodes) + + // create a keyspace group. + kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: uint32(1), + UserKind: endpoint.Standard.String(), + }, + }} + code := suite.tryCreateKeyspaceGroup(re, kgs) + re.Equal(http.StatusOK, code) + + // alloc nodes for the keyspace group + var kg *endpoint.KeyspaceGroup + testutil.Eventually(re, func() bool { + kg, code = suite.tryGetKeyspaceGroup(re, utils.DefaultKeyspaceGroupID) + return code == http.StatusOK && kg != nil && len(kg.Members) == utils.DefaultKeyspaceGroupReplicaCount + }) + stopNode := kg.Members[0].Address + // close one of members + nodes[stopNode].Close() + + // the member list will be updated + testutil.Eventually(re, func() bool { + kg, code = suite.tryGetKeyspaceGroup(re, utils.DefaultKeyspaceGroupID) + for _, member := range kg.Members { + if member.Address == stopNode { + return false + } + } + return code == http.StatusOK && kg != nil && len(kg.Members) == utils.DefaultKeyspaceGroupReplicaCount + }) +} + +func (suite *keyspaceGroupTestSuite) TestAllocOneNode() { + re := suite.Require() + // add one tso server + nodes := make(map[string]bs.Server) + oldTSOServer, cleanupOldTSOserver := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) + defer cleanupOldTSOserver() + nodes[oldTSOServer.GetAddr()] = oldTSOServer + + tests.WaitForPrimaryServing(re, nodes) + + // create a keyspace group. + kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: uint32(1), + UserKind: endpoint.Standard.String(), + }, + }} + code := suite.tryCreateKeyspaceGroup(re, kgs) + re.Equal(http.StatusOK, code) + + // alloc nodes for the keyspace group + var kg *endpoint.KeyspaceGroup + testutil.Eventually(re, func() bool { + kg, code = suite.tryGetKeyspaceGroup(re, utils.DefaultKeyspaceGroupID) + return code == http.StatusOK && kg != nil && len(kg.Members) == 1 + }) + stopNode := kg.Members[0].Address + // close old tso server + nodes[stopNode].Close() + + // create a new tso server + newTSOServer, cleanupNewTSOServer := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) + defer cleanupNewTSOServer() + nodes[newTSOServer.GetAddr()] = newTSOServer + + tests.WaitForPrimaryServing(re, nodes) + + // the member list will be updated + testutil.Eventually(re, func() bool { + kg, code = suite.tryGetKeyspaceGroup(re, utils.DefaultKeyspaceGroupID) + if len(kg.Members) != 0 && kg.Members[0].Address == stopNode { + return false + } + return code == http.StatusOK && kg != nil && len(kg.Members) == 1 + }) +} + func (suite *keyspaceGroupTestSuite) tryAllocNodesForKeyspaceGroup(re *require.Assertions, id int, request *handlers.AllocNodesForKeyspaceGroupParams) ([]endpoint.KeyspaceGroupMember, int) { data, err := json.Marshal(request) re.NoError(err)