diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 64dd1ba8622..ad61fe0a3fa 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -49,10 +49,10 @@ const ( UserKindKey = "user_kind" // TSOKeyspaceGroupIDKey is the key for tso keyspace group id in keyspace config. TSOKeyspaceGroupIDKey = "tso_keyspace_group_id" - // maxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128. + // MaxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128. // We use 120 here to leave some space for other operations. // See: https://github.com/etcd-io/etcd/blob/d3e43d4de6f6d9575b489dd7850a85e37e0f6b6c/server/embed/config.go#L61 - maxEtcdTxnOps = 120 + MaxEtcdTxnOps = 120 ) // Config is the interface for keyspace config. @@ -681,7 +681,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID zap.Duration("cost", time.Since(start)), zap.Uint64("patrolled-keyspace-count", patrolledKeyspaceCount), zap.Uint64("assigned-keyspace-count", assignedKeyspaceCount), - zap.Int("batch-size", maxEtcdTxnOps), + zap.Int("batch-size", MaxEtcdTxnOps), zap.Uint32("start-keyspace-id", startKeyspaceID), zap.Uint32("end-keyspace-id", endKeyspaceID), zap.Uint32("current-start-id", currentStartID), @@ -705,7 +705,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID if defaultKeyspaceGroup.IsMerging() { return ErrKeyspaceGroupInMerging(utils.DefaultKeyspaceGroupID) } - keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, maxEtcdTxnOps) + keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, MaxEtcdTxnOps) if err != nil { return err } @@ -715,9 +715,9 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID currentStartID = keyspaces[0].GetId() nextStartID = keyspaces[keyspaceNum-1].GetId() + 1 } - // If there are less than `maxEtcdTxnOps` keyspaces or the next start ID reaches the end, + // If there are less than `MaxEtcdTxnOps` keyspaces or the next start ID reaches the end, // there is no need to patrol again. - moreToPatrol = keyspaceNum == maxEtcdTxnOps + moreToPatrol = keyspaceNum == MaxEtcdTxnOps var ( assigned = false keyspaceIDsToUnlock = make([]uint32, 0, keyspaceNum) @@ -756,7 +756,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID err = manager.store.SaveKeyspaceMeta(txn, ks) if err != nil { log.Error("[keyspace] failed to save keyspace meta during patrol", - zap.Int("batch-size", maxEtcdTxnOps), + zap.Int("batch-size", MaxEtcdTxnOps), zap.Uint32("start-keyspace-id", startKeyspaceID), zap.Uint32("end-keyspace-id", endKeyspaceID), zap.Uint32("current-start-id", currentStartID), @@ -770,7 +770,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID err = manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup) if err != nil { log.Error("[keyspace] failed to save default keyspace group meta during patrol", - zap.Int("batch-size", maxEtcdTxnOps), + zap.Int("batch-size", MaxEtcdTxnOps), zap.Uint32("start-keyspace-id", startKeyspaceID), zap.Uint32("end-keyspace-id", endKeyspaceID), zap.Uint32("current-start-id", currentStartID), diff --git a/pkg/keyspace/keyspace_test.go b/pkg/keyspace/keyspace_test.go index b06921e48db..27e7de359ee 100644 --- a/pkg/keyspace/keyspace_test.go +++ b/pkg/keyspace/keyspace_test.go @@ -405,7 +405,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() { func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { re := suite.Require() // Create some keyspaces without any keyspace group. - for i := 1; i < maxEtcdTxnOps*2+1; i++ { + for i := 1; i < MaxEtcdTxnOps*2+1; i++ { now := time.Now().Unix() err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{ Id: uint32(i), @@ -420,7 +420,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(defaultKeyspaceGroup) - for i := 1; i < maxEtcdTxnOps*2+1; i++ { + for i := 1; i < MaxEtcdTxnOps*2+1; i++ { re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i)) } // Patrol the keyspace assignment. @@ -430,7 +430,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(defaultKeyspaceGroup) - for i := 1; i < maxEtcdTxnOps*2+1; i++ { + for i := 1; i < MaxEtcdTxnOps*2+1; i++ { re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(i)) } } @@ -438,7 +438,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() { re := suite.Require() // Create some keyspaces without any keyspace group. - for i := 1; i < maxEtcdTxnOps*2+1; i++ { + for i := 1; i < MaxEtcdTxnOps*2+1; i++ { now := time.Now().Unix() err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{ Id: uint32(i), @@ -453,14 +453,14 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() { defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(defaultKeyspaceGroup) - for i := 1; i < maxEtcdTxnOps*2+1; i++ { + for i := 1; i < MaxEtcdTxnOps*2+1; i++ { re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i)) } - // Patrol the keyspace assignment with range [maxEtcdTxnOps/2, maxEtcdTxnOps/2+maxEtcdTxnOps+1] + // Patrol the keyspace assignment with range [MaxEtcdTxnOps/2, MaxEtcdTxnOps/2+MaxEtcdTxnOps+1] // to make sure the range crossing the boundary of etcd transaction operation limit. var ( - startKeyspaceID = uint32(maxEtcdTxnOps / 2) - endKeyspaceID = startKeyspaceID + maxEtcdTxnOps + 1 + startKeyspaceID = uint32(MaxEtcdTxnOps / 2) + endKeyspaceID = startKeyspaceID + MaxEtcdTxnOps + 1 ) err = suite.manager.PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID) re.NoError(err) @@ -468,7 +468,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() { defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(defaultKeyspaceGroup) - for i := 1; i < maxEtcdTxnOps*2+1; i++ { + for i := 1; i < MaxEtcdTxnOps*2+1; i++ { keyspaceID := uint32(i) if keyspaceID >= startKeyspaceID && keyspaceID <= endKeyspaceID { re.Contains(defaultKeyspaceGroup.Keyspaces, keyspaceID) diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index d319798738b..e88055ed86d 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -889,7 +889,7 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin // - Load and delete the keyspace groups in the merge list. // - Load and update the target keyspace group. // So we pre-check the number of operations to avoid exceeding the maximum number of etcd transaction. - if (mergeListNum+1)*2 > maxEtcdTxnOps { + if (mergeListNum+1)*2 > MaxEtcdTxnOps { return ErrExceedMaxEtcdTxnOps } if slice.Contains(mergeList, utils.DefaultKeyspaceGroupID) { @@ -1013,6 +1013,105 @@ func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error { return nil } +// MergeAllIntoDefaultKeyspaceGroup merges all other keyspace groups into the default keyspace group. +func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error { + defer logutil.LogPanic() + // Since we don't take the default keyspace group into account, + // the number of unmerged keyspace groups is -1. + unmergedGroupNum := -1 + // Calculate the total number of keyspace groups to merge. + for _, groups := range m.groups { + unmergedGroupNum += groups.Len() + } + mergedGroupNum := 0 + // Start to merge all keyspace groups into the default one. + for userKind, groups := range m.groups { + mergeNum := groups.Len() + log.Info("start to merge all keyspace groups into the default one", + zap.Stringer("user-kind", userKind), + zap.Int("merge-num", mergeNum), + zap.Int("merged-group-num", mergedGroupNum), + zap.Int("unmerged-group-num", unmergedGroupNum)) + if mergeNum == 0 { + continue + } + var ( + maxBatchSize = MaxEtcdTxnOps/2 - 1 + groupsToMerge = make([]uint32, 0, maxBatchSize) + ) + for idx, group := range groups.GetAll() { + if group.ID == utils.DefaultKeyspaceGroupID { + continue + } + groupsToMerge = append(groupsToMerge, group.ID) + if len(groupsToMerge) < maxBatchSize && idx < mergeNum-1 { + continue + } + log.Info("merge keyspace groups into the default one", + zap.Int("index", idx), + zap.Int("batch-size", len(groupsToMerge)), + zap.Int("merge-num", mergeNum), + zap.Int("merged-group-num", mergedGroupNum), + zap.Int("unmerged-group-num", unmergedGroupNum)) + // Reach the batch size, merge them into the default keyspace group. + if err := m.MergeKeyspaceGroups(utils.DefaultKeyspaceGroupID, groupsToMerge); err != nil { + log.Error("failed to merge all keyspace groups into the default one", + zap.Int("index", idx), + zap.Int("batch-size", len(groupsToMerge)), + zap.Int("merge-num", mergeNum), + zap.Int("merged-group-num", mergedGroupNum), + zap.Int("unmerged-group-num", unmergedGroupNum), + zap.Error(err)) + return err + } + // Wait for the merge to finish. + ctx, cancel := context.WithTimeout(m.ctx, time.Minute) + ticker := time.NewTicker(time.Second) + checkLoop: + for { + select { + case <-ctx.Done(): + log.Info("cancel merging all keyspace groups into the default one", + zap.Int("index", idx), + zap.Int("batch-size", len(groupsToMerge)), + zap.Int("merge-num", mergeNum), + zap.Int("merged-group-num", mergedGroupNum), + zap.Int("unmerged-group-num", unmergedGroupNum)) + cancel() + ticker.Stop() + return nil + case <-ticker.C: + kg, err := m.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) + if err != nil { + log.Error("failed to check the default keyspace group merge state", + zap.Int("index", idx), + zap.Int("batch-size", len(groupsToMerge)), + zap.Int("merge-num", mergeNum), + zap.Int("merged-group-num", mergedGroupNum), + zap.Int("unmerged-group-num", unmergedGroupNum), + zap.Error(err)) + cancel() + ticker.Stop() + return err + } + if !kg.IsMergeTarget() { + break checkLoop + } + } + } + cancel() + ticker.Stop() + mergedGroupNum += len(groupsToMerge) + unmergedGroupNum -= len(groupsToMerge) + groupsToMerge = groupsToMerge[:0] + } + } + log.Info("finish merging all keyspace groups into the default one", + zap.Int("merged-group-num", mergedGroupNum), + zap.Int("unmerged-group-num", unmergedGroupNum)) + return nil +} + // GetKeyspaceGroupPrimaryByID returns the primary node of the keyspace group by ID. func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) { // check if the keyspace group exists diff --git a/pkg/keyspace/tso_keyspace_group_test.go b/pkg/keyspace/tso_keyspace_group_test.go index e8a40a839c8..5f01146eb96 100644 --- a/pkg/keyspace/tso_keyspace_group_test.go +++ b/pkg/keyspace/tso_keyspace_group_test.go @@ -448,7 +448,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() { err = suite.kgm.MergeKeyspaceGroups(4, []uint32{5}) re.ErrorContains(err, ErrKeyspaceGroupNotExists(5).Error()) // merge with the number of keyspace groups exceeds the limit - err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, maxEtcdTxnOps/2)) + err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, MaxEtcdTxnOps/2)) re.ErrorIs(err, ErrExceedMaxEtcdTxnOps) // merge the default keyspace group err = suite.kgm.MergeKeyspaceGroups(1, []uint32{utils.DefaultKeyspaceGroupID}) diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index 7030c332406..bde700e6ef7 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -276,7 +276,8 @@ func FinishSplitKeyspaceByID(c *gin.Context) { // MergeKeyspaceGroupsParams defines the params for merging the keyspace groups. type MergeKeyspaceGroupsParams struct { - MergeList []uint32 `json:"merge-list"` + MergeList []uint32 `json:"merge-list"` + MergeAllIntoDefault bool `json:"merge-all-into-default"` } // MergeKeyspaceGroups merges the keyspace groups in the merge list into the target keyspace group. @@ -292,10 +293,14 @@ func MergeKeyspaceGroups(c *gin.Context) { c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause()) return } - if len(mergeParams.MergeList) == 0 { + if len(mergeParams.MergeList) == 0 && !mergeParams.MergeAllIntoDefault { c.AbortWithStatusJSON(http.StatusBadRequest, "invalid empty merge list") return } + if len(mergeParams.MergeList) > 0 && mergeParams.MergeAllIntoDefault { + c.AbortWithStatusJSON(http.StatusBadRequest, "non-empty merge list when merge all into default") + return + } for _, mergeID := range mergeParams.MergeList { if !isValid(mergeID) { c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") @@ -310,7 +315,11 @@ func MergeKeyspaceGroups(c *gin.Context) { return } // Merge keyspace group. - err = groupManager.MergeKeyspaceGroups(id, mergeParams.MergeList) + if mergeParams.MergeAllIntoDefault { + err = groupManager.MergeAllIntoDefaultKeyspaceGroup() + } else { + err = groupManager.MergeKeyspaceGroups(id, mergeParams.MergeList) + } if err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) return diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index ed4a1b964b0..5a20211c7e9 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -16,6 +16,7 @@ package tso import ( "context" + "math/rand" "strings" "sync" "testing" @@ -28,6 +29,7 @@ import ( pd "github.com/tikv/pd/client" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/keyspace" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/storage/endpoint" @@ -465,15 +467,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() { re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)")) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) - kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0) - re.Equal(uint32(0), kg.ID) - re.Equal([]uint32{0}, kg.Keyspaces) - re.False(kg.IsSplitting()) // wait for finishing alloc nodes - testutil.Eventually(re, func() bool { - kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0) - return len(kg.Members) == 2 - }) + waitFinishAllocNodes(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID) testConfig := map[string]string{ "config": "1", "tso_keyspace_group_id": "0", @@ -483,15 +478,19 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() { Name: "test_keyspace", Config: testConfig, }) - kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0) - testutil.Eventually(re, func() bool { - kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0) - return len(kg.Members) == 2 - }) + waitFinishAllocNodes(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) } +func waitFinishAllocNodes(re *require.Assertions, server *tests.TestServer, groupID uint32) { + testutil.Eventually(re, func() bool { + kg := handlersutil.MustLoadKeyspaceGroupByID(re, server, groupID) + re.Equal(groupID, kg.ID) + return len(kg.Members) == mcsutils.DefaultKeyspaceGroupReplicaCount + }) +} + func TestTwiceSplitKeyspaceGroup(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) @@ -749,3 +748,48 @@ func TestGetTSOImmediately(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller")) } + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) + + var ( + keyspaceGroupNum = keyspace.MaxEtcdTxnOps + keyspaceGroups = make([]*endpoint.KeyspaceGroup, 0, keyspaceGroupNum) + keyspaces = make([]uint32, 0, keyspaceGroupNum) + ) + for i := 1; i <= keyspaceGroupNum; i++ { + keyspaceGroups = append(keyspaceGroups, &endpoint.KeyspaceGroup{ + ID: uint32(i), + UserKind: endpoint.UserKind(rand.Intn(int(endpoint.UserKindCount))).String(), + Keyspaces: []uint32{uint32(i)}, + }) + keyspaces = append(keyspaces, uint32(i)) + if len(keyspaceGroups) < keyspace.MaxEtcdTxnOps/2 && i != keyspaceGroupNum { + continue + } + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: keyspaceGroups, + }) + keyspaceGroups = keyspaceGroups[:0] + } + // Check if all the keyspace groups are created. + groups := handlersutil.MustLoadKeyspaceGroups(re, suite.pdLeaderServer, "0", "0") + re.Len(groups, keyspaceGroupNum+1) + // Wait for all the keyspace groups to be served. + svr := suite.tsoCluster.WaitForDefaultPrimaryServing(re) + re.NotNil(svr) + svr = suite.tsoCluster.WaitForPrimaryServing(re, uint32(keyspaceGroupNum), uint32(keyspaceGroupNum)) + re.NotNil(svr) + // Merge all the keyspace groups into the default keyspace group. + handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ + MergeAllIntoDefault: true, + }) + // Wait for all the keyspace groups to be merged. + waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, keyspaces) + // Check if all the keyspace groups are merged. + groups = handlersutil.MustLoadKeyspaceGroups(re, suite.pdLeaderServer, "0", "0") + re.Len(groups, 1) + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) +}