Skip to content

Commit

Permalink
mcs: remove wait api service ready (#8476)
Browse files Browse the repository at this point in the history
ref #8477

Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx authored Aug 12, 2024
1 parent 3f32f54 commit f3602e3
Show file tree
Hide file tree
Showing 61 changed files with 519 additions and 663 deletions.
24 changes: 12 additions & 12 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/slice"
Expand Down Expand Up @@ -118,20 +118,20 @@ func NewKeyspaceManager(
cluster: cluster,
config: config,
kgm: kgm,
nextPatrolStartID: utils.DefaultKeyspaceID,
nextPatrolStartID: constant.DefaultKeyspaceID,
}
}

// Bootstrap saves default keyspace info.
func (manager *Manager) Bootstrap() error {
// Split Keyspace Region for default keyspace.
if err := manager.splitKeyspaceRegion(utils.DefaultKeyspaceID, false); err != nil {
if err := manager.splitKeyspaceRegion(constant.DefaultKeyspaceID, false); err != nil {
return err
}
now := time.Now().Unix()
defaultKeyspaceMeta := &keyspacepb.KeyspaceMeta{
Id: utils.DefaultKeyspaceID,
Name: utils.DefaultKeyspaceName,
Id: constant.DefaultKeyspaceID,
Name: constant.DefaultKeyspaceName,
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
Expand Down Expand Up @@ -543,7 +543,7 @@ func (manager *Manager) UpdateKeyspaceConfig(name string, mutations []*Mutation)
// It returns error if saving failed, operation not allowed, or if keyspace not exists.
func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.KeyspaceState, now int64) (*keyspacepb.KeyspaceMeta, error) {
// Changing the state of default keyspace is not allowed.
if name == utils.DefaultKeyspaceName {
if name == constant.DefaultKeyspaceName {
log.Warn("[keyspace] failed to update keyspace config",
zap.Error(ErrModifyDefaultKeyspace),
)
Expand Down Expand Up @@ -595,7 +595,7 @@ func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.Key
// It returns error if saving failed, operation not allowed, or if keyspace not exists.
func (manager *Manager) UpdateKeyspaceStateByID(id uint32, newState keyspacepb.KeyspaceState, now int64) (*keyspacepb.KeyspaceMeta, error) {
// Changing the state of default keyspace is not allowed.
if id == utils.DefaultKeyspaceID {
if id == constant.DefaultKeyspaceID {
log.Warn("[keyspace] failed to update keyspace config",
zap.Error(ErrModifyDefaultKeyspace),
)
Expand Down Expand Up @@ -724,18 +724,18 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
var defaultKeyspaceGroup *endpoint.KeyspaceGroup
err = manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
var err error
defaultKeyspaceGroup, err = manager.kgm.store.LoadKeyspaceGroup(txn, utils.DefaultKeyspaceGroupID)
defaultKeyspaceGroup, err = manager.kgm.store.LoadKeyspaceGroup(txn, constant.DefaultKeyspaceGroupID)
if err != nil {
return err
}
if defaultKeyspaceGroup == nil {
return errors.Errorf("default keyspace group %d not found", utils.DefaultKeyspaceGroupID)
return errors.Errorf("default keyspace group %d not found", constant.DefaultKeyspaceGroupID)
}
if defaultKeyspaceGroup.IsSplitting() {
return ErrKeyspaceGroupInSplit(utils.DefaultKeyspaceGroupID)
return ErrKeyspaceGroupInSplit(constant.DefaultKeyspaceGroupID)
}
if defaultKeyspaceGroup.IsMerging() {
return ErrKeyspaceGroupInMerging(utils.DefaultKeyspaceGroupID)
return ErrKeyspaceGroupInMerging(constant.DefaultKeyspaceGroupID)
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, etcdutil.MaxEtcdTxnOps)
if err != nil {
Expand Down Expand Up @@ -784,7 +784,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
// Only save the keyspace group meta if any keyspace is assigned to it.
assigned = true
}
ks.Config[TSOKeyspaceGroupIDKey] = strconv.FormatUint(uint64(utils.DefaultKeyspaceGroupID), 10)
ks.Config[TSOKeyspaceGroupIDKey] = strconv.FormatUint(uint64(constant.DefaultKeyspaceGroupID), 10)
err = manager.store.SaveKeyspaceMeta(txn, ks)
if err != nil {
log.Error("[keyspace] failed to save keyspace meta during patrol",
Expand Down
18 changes: 9 additions & 9 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
Expand Down Expand Up @@ -187,7 +187,7 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceConfig() {
re.Error(err)
}
// Changing config of DEFAULT keyspace is allowed.
updated, err := manager.UpdateKeyspaceConfig(utils.DefaultKeyspaceName, mutations)
updated, err := manager.UpdateKeyspaceConfig(constant.DefaultKeyspaceName, mutations)
re.NoError(err)
// remove auto filled fields
delete(updated.Config, TSOKeyspaceGroupIDKey)
Expand Down Expand Up @@ -227,7 +227,7 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceState() {
_, err = manager.UpdateKeyspaceState(createRequest.Name, keyspacepb.KeyspaceState_ENABLED, newTime)
re.Error(err)
// Changing state of DEFAULT keyspace is not allowed.
_, err = manager.UpdateKeyspaceState(utils.DefaultKeyspaceName, keyspacepb.KeyspaceState_DISABLED, newTime)
_, err = manager.UpdateKeyspaceState(constant.DefaultKeyspaceName, keyspacepb.KeyspaceState_DISABLED, newTime)
re.Error(err)
}
}
Expand Down Expand Up @@ -392,15 +392,15 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() {
})
re.NoError(err)
// Check if the keyspace is not attached to the default group.
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(constant.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(111))
// Patrol the keyspace assignment.
err = suite.manager.PatrolKeyspaceAssignment(0, 0)
re.NoError(err)
// Check if the keyspace is attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(constant.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(111))
Expand All @@ -421,7 +421,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
re.NoError(err)
}
// Check if all the keyspaces are not attached to the default group.
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(constant.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < etcdutil.MaxEtcdTxnOps*2+1; i++ {
Expand All @@ -431,7 +431,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
err = suite.manager.PatrolKeyspaceAssignment(0, 0)
re.NoError(err)
// Check if all the keyspaces are attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(constant.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < etcdutil.MaxEtcdTxnOps*2+1; i++ {
Expand All @@ -454,7 +454,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() {
re.NoError(err)
}
// Check if all the keyspaces are not attached to the default group.
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(constant.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < etcdutil.MaxEtcdTxnOps*2+1; i++ {
Expand All @@ -469,7 +469,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() {
err = suite.manager.PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID)
re.NoError(err)
// Check if only the keyspaces within the range are attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(constant.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < etcdutil.MaxEtcdTxnOps*2+1; i++ {
Expand Down
38 changes: 19 additions & 19 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/balancer"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
Expand Down Expand Up @@ -90,7 +90,7 @@ func NewKeyspaceGroupManager(
ctx, cancel := context.WithCancel(ctx)
groups := make(map[endpoint.UserKind]*indexedHeap)
for i := 0; i < int(endpoint.UserKindCount); i++ {
groups[endpoint.UserKind(i)] = newIndexedHeap(int(utils.MaxKeyspaceGroupCountInUse))
groups[endpoint.UserKind(i)] = newIndexedHeap(int(constant.MaxKeyspaceGroupCountInUse))
}
m := &GroupManager{
ctx: ctx,
Expand Down Expand Up @@ -119,9 +119,9 @@ func (m *GroupManager) Bootstrap(ctx context.Context) error {
// leave the replica/member list empty. The TSO service will assign the default keyspace group replica
// to every tso node/pod by default.
defaultKeyspaceGroup := &endpoint.KeyspaceGroup{
ID: utils.DefaultKeyspaceGroupID,
ID: constant.DefaultKeyspaceGroupID,
UserKind: endpoint.Basic.String(),
Keyspaces: []uint32{utils.DefaultKeyspaceID},
Keyspaces: []uint32{constant.DefaultKeyspaceID},
}

m.Lock()
Expand All @@ -134,7 +134,7 @@ func (m *GroupManager) Bootstrap(ctx context.Context) error {
}

// Load all the keyspace groups from the storage and add to the respective userKind groups.
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0)
groups, err := m.store.LoadKeyspaceGroups(constant.DefaultKeyspaceGroupID, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
continue
}
}
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0)
groups, err := m.store.LoadKeyspaceGroups(constant.DefaultKeyspaceGroupID, 0)
if err != nil {
log.Error("failed to load all keyspace groups", zap.Error(err))
continue
Expand All @@ -205,8 +205,8 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
if numExistMembers != 0 && numExistMembers == len(group.Members) && numExistMembers == m.GetNodesCount() {
continue
}
if numExistMembers < utils.DefaultKeyspaceGroupReplicaCount {
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, existMembers, utils.DefaultKeyspaceGroupReplicaCount)
if numExistMembers < constant.DefaultKeyspaceGroupReplicaCount {
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, existMembers, constant.DefaultKeyspaceGroupReplicaCount)
if err != nil {
log.Error("failed to alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", group.ID), zap.Error(err))
continue
Expand Down Expand Up @@ -430,7 +430,7 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI
failpoint.Inject("externalAllocNode", func(val failpoint.Value) {
failpointOnce.Do(func() {
addrs := val.(string)
_ = m.SetNodesForKeyspaceGroup(utils.DefaultKeyspaceGroupID, strings.Split(addrs, ","))
_ = m.SetNodesForKeyspaceGroup(constant.DefaultKeyspaceGroupID, strings.Split(addrs, ","))
})
})
m.Lock()
Expand Down Expand Up @@ -574,7 +574,7 @@ func (m *GroupManager) SplitKeyspaceGroupByID(
return err
}
// Check if the source keyspace group has enough replicas.
if len(splitSourceKg.Members) < utils.DefaultKeyspaceGroupReplicaCount {
if len(splitSourceKg.Members) < constant.DefaultKeyspaceGroupReplicaCount {
return ErrKeyspaceGroupNotEnoughReplicas
}
// Check if the new keyspace group already exists.
Expand Down Expand Up @@ -634,7 +634,7 @@ func buildSplitKeyspaces(
oldKeyspaceMap[keyspace] = struct{}{}
}
for _, keyspace := range new {
if keyspace == utils.DefaultKeyspaceID {
if keyspace == constant.DefaultKeyspaceID {
return nil, nil, ErrModifyDefaultKeyspace
}
if _, ok := oldKeyspaceMap[keyspace]; !ok {
Expand Down Expand Up @@ -670,7 +670,7 @@ func buildSplitKeyspaces(
newKeyspaceMap = make(map[uint32]struct{}, newNum)
)
for _, keyspace := range old {
if keyspace == utils.DefaultKeyspaceID {
if keyspace == constant.DefaultKeyspaceID {
// The source keyspace group must be the default keyspace group and we always keep the default
// keyspace in the default keyspace group.
continue
Expand Down Expand Up @@ -778,7 +778,7 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, existMembers map[st
for addr := range existMembers {
nodes = append(nodes, endpoint.KeyspaceGroupMember{
Address: addr,
Priority: utils.DefaultKeyspaceGroupReplicaPriority,
Priority: constant.DefaultKeyspaceGroupReplicaPriority,
})
}

Expand All @@ -804,7 +804,7 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, existMembers map[st
existMembers[addr] = struct{}{}
nodes = append(nodes, endpoint.KeyspaceGroupMember{
Address: addr,
Priority: utils.DefaultKeyspaceGroupReplicaPriority,
Priority: constant.DefaultKeyspaceGroupReplicaPriority,
})
}
kg.Members = nodes
Expand Down Expand Up @@ -844,7 +844,7 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error
for _, node := range nodes {
members = append(members, endpoint.KeyspaceGroupMember{
Address: node,
Priority: utils.DefaultKeyspaceGroupReplicaPriority,
Priority: constant.DefaultKeyspaceGroupReplicaPriority,
})
}
kg.Members = members
Expand Down Expand Up @@ -923,7 +923,7 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin
if (mergeListNum+1)*2 > etcdutil.MaxEtcdTxnOps {
return ErrExceedMaxEtcdTxnOps
}
if slice.Contains(mergeList, utils.DefaultKeyspaceGroupID) {
if slice.Contains(mergeList, constant.DefaultKeyspaceGroupID) {
return ErrModifyDefaultKeyspaceGroup
}
var (
Expand Down Expand Up @@ -1071,7 +1071,7 @@ func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error {
groupsToMerge = make([]uint32, 0, maxBatchSize)
)
for idx, group := range groups.GetAll() {
if group.ID == utils.DefaultKeyspaceGroupID {
if group.ID == constant.DefaultKeyspaceGroupID {
continue
}
groupsToMerge = append(groupsToMerge, group.ID)
Expand All @@ -1085,7 +1085,7 @@ func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error {
zap.Int("merged-group-num", mergedGroupNum),
zap.Int("unmerged-group-num", unmergedGroupNum))
// Reach the batch size, merge them into the default keyspace group.
if err := m.MergeKeyspaceGroups(utils.DefaultKeyspaceGroupID, groupsToMerge); err != nil {
if err := m.MergeKeyspaceGroups(constant.DefaultKeyspaceGroupID, groupsToMerge); err != nil {
log.Error("failed to merge all keyspace groups into the default one",
zap.Int("index", idx),
zap.Int("batch-size", len(groupsToMerge)),
Expand All @@ -1112,7 +1112,7 @@ func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error {
ticker.Stop()
return nil
case <-ticker.C:
kg, err := m.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
kg, err := m.GetKeyspaceGroupByID(constant.DefaultKeyspaceGroupID)
if err != nil {
log.Error("failed to check the default keyspace group merge state",
zap.Int("index", idx),
Expand Down
14 changes: 7 additions & 7 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/mock/mockconfig"
"github.com/tikv/pd/pkg/mock/mockid"
Expand Down Expand Up @@ -87,7 +87,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupOperations() {
re.NoError(err)
re.Len(kgs, 2)
// get the default keyspace group
kg, err := suite.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
kg, err := suite.kgm.GetKeyspaceGroupByID(constant.DefaultKeyspaceGroupID)
re.NoError(err)
re.Equal(uint32(0), kg.ID)
re.Equal(endpoint.Basic.String(), kg.UserKind)
Expand Down Expand Up @@ -248,13 +248,13 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
ID: uint32(2),
UserKind: endpoint.Standard.String(),
Keyspaces: []uint32{111, 222, 333},
Members: make([]endpoint.KeyspaceGroupMember, utils.DefaultKeyspaceGroupReplicaCount),
Members: make([]endpoint.KeyspaceGroupMember, constant.DefaultKeyspaceGroupReplicaCount),
},
}
err := suite.kgm.CreateKeyspaceGroups(keyspaceGroups)
re.NoError(err)
// split the default keyspace
err = suite.kgm.SplitKeyspaceGroupByID(0, 4, []uint32{utils.DefaultKeyspaceID})
err = suite.kgm.SplitKeyspaceGroupByID(0, 4, []uint32{constant.DefaultKeyspaceID})
re.ErrorIs(err, ErrModifyDefaultKeyspace)
// split the keyspace group 1 to 4
err = suite.kgm.SplitKeyspaceGroupByID(1, 4, []uint32{444})
Expand Down Expand Up @@ -341,7 +341,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplitRange() {
ID: uint32(2),
UserKind: endpoint.Standard.String(),
Keyspaces: []uint32{111, 333, 444, 555, 666},
Members: make([]endpoint.KeyspaceGroupMember, utils.DefaultKeyspaceGroupReplicaCount),
Members: make([]endpoint.KeyspaceGroupMember, constant.DefaultKeyspaceGroupReplicaCount),
},
}
err := suite.kgm.CreateKeyspaceGroups(keyspaceGroups)
Expand Down Expand Up @@ -388,7 +388,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() {
ID: uint32(1),
UserKind: endpoint.Basic.String(),
Keyspaces: []uint32{111, 222, 333},
Members: make([]endpoint.KeyspaceGroupMember, utils.DefaultKeyspaceGroupReplicaCount),
Members: make([]endpoint.KeyspaceGroupMember, constant.DefaultKeyspaceGroupReplicaCount),
},
{
ID: uint32(3),
Expand Down Expand Up @@ -453,7 +453,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() {
err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, etcdutil.MaxEtcdTxnOps/2))
re.ErrorIs(err, ErrExceedMaxEtcdTxnOps)
// merge the default keyspace group
err = suite.kgm.MergeKeyspaceGroups(1, []uint32{utils.DefaultKeyspaceGroupID})
err = suite.kgm.MergeKeyspaceGroups(1, []uint32{constant.DefaultKeyspaceGroupID})
re.ErrorIs(err, ErrModifyDefaultKeyspaceGroup)
}

Expand Down
Loading

0 comments on commit f3602e3

Please sign in to comment.