From a3685b9d3d38e337c2ffd16fb72402fc5fb9c7ac Mon Sep 17 00:00:00 2001 From: husharp Date: Tue, 21 May 2024 11:57:13 +0800 Subject: [PATCH 1/6] add alloc Signed-off-by: husharp --- .../mcs/tso/keyspace_group_manager_test.go | 157 ++++++++++-------- 1 file changed, 88 insertions(+), 69 deletions(-) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 909972f0315..7cc45011707 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -16,6 +16,7 @@ package tso import ( "context" + "github.com/tikv/pd/pkg/mock/mockid" "math/rand" "net/http" "strings" @@ -56,6 +57,13 @@ type tsoKeyspaceGroupManagerTestSuite struct { pdLeaderServer *tests.TestServer // tsoCluster is the TSO service cluster. tsoCluster *tests.TestTSOCluster + + allocator *mockid.IDAllocator +} + +func (suite *tsoKeyspaceGroupManagerTestSuite) alloc() uint32 { + id, _ := suite.allocator.Alloc() + return uint32(id) } func TestTSOKeyspaceGroupManager(t *testing.T) { @@ -77,6 +85,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() { re.NoError(suite.pdLeaderServer.BootstrapCluster()) suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 2, suite.pdLeaderServer.GetAddr()) re.NoError(err) + suite.allocator = mockid.NewIDAllocator() } func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownSuite() { @@ -166,9 +175,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe keyspaceGroupID uint32 keyspaceIDs []uint32 }{ - {0, []uint32{0, 10}}, - {1, []uint32{1, 11}}, - {2, []uint32{2, 12}}, + {suite.alloc(), []uint32{0, 10}}, + {suite.alloc(), []uint32{1, 11}}, + {suite.alloc(), []uint32{2, 12}}, } for _, param := range params { @@ -242,51 +251,53 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { re := suite.Require() - // Create the keyspace group 1 with keyspaces [111, 222, 333]. + // Create the keyspace group `oldID` with keyspaces [111, 222, 333]. + oldID := suite.alloc() handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { - ID: 1, + ID: oldID, UserKind: endpoint.Standard.String(), Members: suite.tsoCluster.GetKeyspaceGroupMember(), Keyspaces: []uint32{111, 222, 333}, }, }, }) - kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1) - re.Equal(uint32(1), kg1.ID) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, oldID) + re.Equal(oldID, kg1.ID) re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) re.False(kg1.IsSplitting()) - // Get a TSO from the keyspace group 1. + // Get a TSO from the keyspace group `oldID`. var ( ts pdpb.Timestamp err error ) testutil.Eventually(re, func() bool { - ts, err = suite.requestTSO(re, 222, 1) + ts, err = suite.requestTSO(re, 222, oldID) return err == nil && tsoutil.CompareTimestamp(&ts, &pdpb.Timestamp{}) > 0 }) ts.Physical += time.Hour.Milliseconds() - // Set the TSO of the keyspace group 1 to a large value. - err = suite.tsoCluster.GetPrimaryServer(222, 1).ResetTS(tsoutil.GenerateTS(&ts), false, true, 1) + // Set the TSO of the keyspace group `oldID` to a large value. + err = suite.tsoCluster.GetPrimaryServer(222, oldID).ResetTS(tsoutil.GenerateTS(&ts), false, true, oldID) re.NoError(err) - // Split the keyspace group 1 to 2. - handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ - NewID: 2, + // Split the keyspace group `oldID` to `newID`. + newID := suite.alloc() + handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, oldID, &handlers.SplitKeyspaceGroupByIDParams{ + NewID: newID, Keyspaces: []uint32{222, 333}, }) // Wait for the split to complete automatically even there is no TSO request from the outside. testutil.Eventually(re, func() bool { - kg2, code := handlersutil.TryLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2) + kg2, code := handlersutil.TryLoadKeyspaceGroupByID(re, suite.pdLeaderServer, newID) if code != http.StatusOK { return false } - re.Equal(uint32(2), kg2.ID) + re.Equal(newID, kg2.ID) re.Equal([]uint32{222, 333}, kg2.Keyspaces) return !kg2.IsSplitting() }) - // Check the split TSO from keyspace group 2 now. - splitTS, err := suite.requestTSO(re, 222, 2) + // Check the split TSO from keyspace group `newID` now. + splitTS, err := suite.requestTSO(re, 222, newID) re.NoError(err) re.Greater(tsoutil.CompareTimestamp(&splitTS, &ts), 0) } @@ -304,60 +315,62 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO( func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection() { re := suite.Require() - // Create the keyspace group 1 with keyspaces [111, 222, 333]. + // Create the keyspace group `oldID` with keyspaces [111, 222, 333]. + oldID := suite.alloc() handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { - ID: 1, + ID: oldID, UserKind: endpoint.Standard.String(), Members: suite.tsoCluster.GetKeyspaceGroupMember(), Keyspaces: []uint32{111, 222, 333}, }, }, }) - kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1) - re.Equal(uint32(1), kg1.ID) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, oldID) + re.Equal(oldID, kg1.ID) re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) re.False(kg1.IsSplitting()) - // Split the keyspace group 1 to 2. - handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ - NewID: 2, + // Split the keyspace group `oldID` to `newID`. + newID := suite.alloc() + handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, oldID, &handlers.SplitKeyspaceGroupByIDParams{ + NewID: newID, Keyspaces: []uint32{222, 333}, }) - kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2) - re.Equal(uint32(2), kg2.ID) + kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, newID) + re.Equal(newID, kg2.ID) re.Equal([]uint32{222, 333}, kg2.Keyspaces) re.True(kg2.IsSplitTarget()) // Check the leadership. - member1, err := suite.tsoCluster.WaitForPrimaryServing(re, 111, 1).GetMember(111, 1) + member1, err := suite.tsoCluster.WaitForPrimaryServing(re, 111, oldID).GetMember(111, oldID) re.NoError(err) re.NotNil(member1) - member2, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, 2).GetMember(222, 2) + member2, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, newID).GetMember(222, newID) re.NoError(err) re.NotNil(member2) - // Wait for the leader of the keyspace group 1 and 2 to be elected. + // Wait for the leader of the keyspace group `oldID` and `newID` to be elected. testutil.Eventually(re, func() bool { return len(member1.GetLeaderListenUrls()) > 0 && len(member2.GetLeaderListenUrls()) > 0 }) - // Check if the leader of the keyspace group 1 and 2 are the same. + // Check if the leader of the keyspace group `oldID` and `newID` are the same. re.Equal(member1.GetLeaderListenUrls(), member2.GetLeaderListenUrls()) - // Resign and block the leader of the keyspace group 1 from being elected. + // Resign and block the leader of the keyspace group `oldID` from being elected. member1.(*member.Participant).SetCampaignChecker(func(*election.Leadership) bool { return false }) member1.ResetLeader() - // The leader of the keyspace group 2 should be resigned also. + // The leader of the keyspace group `newID` should be resigned also. testutil.Eventually(re, func() bool { return member2.IsLeader() == false }) - // Check if the leader of the keyspace group 1 and 2 are the same again. + // Check if the leader of the keyspace group `oldID` and `newID` are the same again. member1.(*member.Participant).SetCampaignChecker(nil) testutil.Eventually(re, func() bool { return len(member1.GetLeaderListenUrls()) > 0 && len(member2.GetLeaderListenUrls()) > 0 }) re.Equal(member1.GetLeaderListenUrls(), member2.GetLeaderListenUrls()) // Wait for the keyspace groups to finish the split. - waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{111}, []uint32{222, 333}) + waitFinishSplit(re, suite.pdLeaderServer, oldID, newID, []uint32{111}, []uint32{222, 333}) } func waitFinishSplit( @@ -390,30 +403,32 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() re := suite.Require() // Enable the failpoint to slow down the system time to test whether the TSO is monotonic. re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/systemTimeSlow", `return(true)`)) - // Create the keyspace group 1 with keyspaces [444, 555, 666]. + // Create the keyspace group `oldID` with keyspaces [444, 555, 666]. + oldID := suite.alloc() handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { - ID: 1, + ID: oldID, UserKind: endpoint.Standard.String(), Members: suite.tsoCluster.GetKeyspaceGroupMember(), Keyspaces: []uint32{444, 555, 666}, }, }, }) - kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1) - re.Equal(uint32(1), kg1.ID) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, oldID) + re.Equal(oldID, kg1.ID) re.Equal([]uint32{444, 555, 666}, kg1.Keyspaces) re.False(kg1.IsSplitting()) // Request the TSO for keyspace 555 concurrently via client. - cancel := suite.dispatchClient(re, 555, 1) - // Split the keyspace group 1 to 2. - handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ - NewID: 2, + cancel := suite.dispatchClient(re, 555, oldID) + // Split the keyspace group `oldID` to `newID`. + newID := suite.alloc() + handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, oldID, &handlers.SplitKeyspaceGroupByIDParams{ + NewID: newID, Keyspaces: []uint32{555, 666}, }) // Wait for the keyspace groups to finish the split. - waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{444}, []uint32{555, 666}) + waitFinishSplit(re, suite.pdLeaderServer, oldID, newID, []uint32{444}, []uint32{555, 666}) // Stop the client. cancel() re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/systemTimeSlow")) @@ -569,48 +584,49 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMerge() { re := suite.Require() - // Create the keyspace group 1 and 2 with keyspaces [111, 222] and [333]. + // Create the keyspace group `firstID` and `secondID` with keyspaces [111, 222] and [333]. + firstID, secondID := suite.alloc(), suite.alloc() handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { - ID: 1, + ID: firstID, UserKind: endpoint.Standard.String(), Members: suite.tsoCluster.GetKeyspaceGroupMember(), Keyspaces: []uint32{111, 222}, }, { - ID: 2, + ID: secondID, UserKind: endpoint.Standard.String(), Members: suite.tsoCluster.GetKeyspaceGroupMember(), Keyspaces: []uint32{333}, }, }, }) - // Get a TSO from the keyspace group 1. + // Get a TSO from the keyspace group `firstID`. var ( ts pdpb.Timestamp err error ) testutil.Eventually(re, func() bool { - ts, err = suite.requestTSO(re, 222, 1) + ts, err = suite.requestTSO(re, 222, firstID) return err == nil && tsoutil.CompareTimestamp(&ts, &pdpb.Timestamp{}) > 0 }) ts.Physical += time.Hour.Milliseconds() - // Set the TSO of the keyspace group 1 to a large value. - err = suite.tsoCluster.GetPrimaryServer(222, 1).ResetTS(tsoutil.GenerateTS(&ts), false, true, 1) + // Set the TSO of the keyspace group `firstID` to a large value. + err = suite.tsoCluster.GetPrimaryServer(222, firstID).ResetTS(tsoutil.GenerateTS(&ts), false, true, firstID) re.NoError(err) - // Merge the keyspace group 1 and 2 to the default keyspace group. + // Merge the keyspace group `firstID` and `secondID` to the default keyspace group. handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ - MergeList: []uint32{1, 2}, + MergeList: []uint32{firstID, secondID}, }) - // Check the keyspace group 1 and 2 are merged to the default keyspace group. + // Check the keyspace group `firstID` and `secondID` are merged to the default keyspace group. kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID) re.Equal(mcsutils.DefaultKeyspaceGroupID, kg.ID) for _, keyspaceID := range []uint32{111, 222, 333} { re.Contains(kg.Keyspaces, keyspaceID) } re.True(kg.IsMergeTarget()) - // Check the merged TSO from the default keyspace group is greater than the TSO from the keyspace group 1. + // Check the merged TSO from the default keyspace group is greater than the TSO from the keyspace group`firstID`. var mergedTS pdpb.Timestamp testutil.Eventually(re, func() bool { mergedTS, err = suite.requestTSO(re, 333, mcsutils.DefaultKeyspaceGroupID) @@ -624,26 +640,27 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMerge() { func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeClient() { re := suite.Require() - // Create the keyspace group 1 with keyspaces [111, 222, 333]. + // Create the keyspace group `id` with keyspaces [111, 222, 333]. + id := suite.alloc() handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { - ID: 1, + ID: id, UserKind: endpoint.Standard.String(), Members: suite.tsoCluster.GetKeyspaceGroupMember(), Keyspaces: []uint32{111, 222, 333}, }, }, }) - kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1) - re.Equal(uint32(1), kg1.ID) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, id) + re.Equal(id, kg1.ID) re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) re.False(kg1.IsMerging()) // Request the TSO for keyspace 222 concurrently via client. - cancel := suite.dispatchClient(re, 222, 1) + cancel := suite.dispatchClient(re, 222, id) // Merge the keyspace group 1 to the default keyspace group. handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ - MergeList: []uint32{1}, + MergeList: []uint32{id}, }) // Wait for the default keyspace group to finish the merge. waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, []uint32{111, 222, 333}) @@ -671,24 +688,25 @@ func waitFinishMerge( func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeBeforeInitTSO() { re := suite.Require() - // Make sure the TSO of keyspace group 1 won't be initialized before it's merged. + // Make sure the TSO of keyspace group `id` won't be initialized before it's merged. re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/failedToSaveTimestamp", `return(true)`)) // Request the TSO for the default keyspace concurrently via client. + id := suite.alloc() cancel := suite.dispatchClient(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) // Create the keyspace group 1 with keyspaces [111, 222, 333]. handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { - ID: 1, + ID: id, UserKind: endpoint.Standard.String(), Members: suite.tsoCluster.GetKeyspaceGroupMember(), Keyspaces: []uint32{111, 222, 333}, }, }, }) - // Merge the keyspace group 1 to the default keyspace group. + // Merge the keyspace group `id` to the default keyspace group. handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ - MergeList: []uint32{1}, + MergeList: []uint32{id}, }) // Wait for the default keyspace group to finish the merge. waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, []uint32{111, 222, 333}) @@ -775,12 +793,13 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault keyspaces = make([]uint32, 0, keyspaceGroupNum) ) for i := 1; i <= keyspaceGroupNum; i++ { + id := suite.alloc() keyspaceGroups = append(keyspaceGroups, &endpoint.KeyspaceGroup{ - ID: uint32(i), + ID: id, UserKind: endpoint.UserKind(rand.Intn(int(endpoint.UserKindCount))).String(), - Keyspaces: []uint32{uint32(i)}, + Keyspaces: []uint32{id}, }) - keyspaces = append(keyspaces, uint32(i)) + keyspaces = append(keyspaces, id) if i != keyspaceGroupNum { continue } @@ -797,7 +816,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault re.NotNil(svr) for i := 1; i < keyspaceGroupNum; i++ { // Check if the keyspace group is served. - svr = suite.tsoCluster.WaitForPrimaryServing(re, uint32(i), uint32(i)) + svr = suite.tsoCluster.WaitForPrimaryServing(re, keyspaceGroups[i].ID, keyspaceGroups[i].ID) re.NotNil(svr) } // Merge all the keyspace groups into the default keyspace group. From 5bd948d24509a0c98ca28393e75e20a8615ff4ad Mon Sep 17 00:00:00 2001 From: husharp Date: Tue, 21 May 2024 12:00:38 +0800 Subject: [PATCH 2/6] add alloc Signed-off-by: husharp --- tests/integrations/mcs/tso/keyspace_group_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 7cc45011707..4b50228f564 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -16,7 +16,6 @@ package tso import ( "context" - "github.com/tikv/pd/pkg/mock/mockid" "math/rand" "net/http" "strings" @@ -33,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/errs" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" + "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/storage/endpoint" tsopkg "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/etcdutil" From cb967b7e5e8e4dd79d339ae820717adb53287e29 Mon Sep 17 00:00:00 2001 From: husharp Date: Tue, 21 May 2024 15:02:58 +0800 Subject: [PATCH 3/6] address comment and change base Signed-off-by: husharp --- .../mcs/tso/keyspace_group_manager_test.go | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 4b50228f564..f7b892ce77d 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -61,7 +61,7 @@ type tsoKeyspaceGroupManagerTestSuite struct { allocator *mockid.IDAllocator } -func (suite *tsoKeyspaceGroupManagerTestSuite) alloc() uint32 { +func (suite *tsoKeyspaceGroupManagerTestSuite) allocID() uint32 { id, _ := suite.allocator.Alloc() return uint32(id) } @@ -86,6 +86,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() { suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 2, suite.pdLeaderServer.GetAddr()) re.NoError(err) suite.allocator = mockid.NewIDAllocator() + suite.allocator.SetBase(uint64(time.Now().Second())) } func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownSuite() { @@ -175,9 +176,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe keyspaceGroupID uint32 keyspaceIDs []uint32 }{ - {suite.alloc(), []uint32{0, 10}}, - {suite.alloc(), []uint32{1, 11}}, - {suite.alloc(), []uint32{2, 12}}, + {suite.allocID(), []uint32{0, 10}}, + {suite.allocID(), []uint32{1, 11}}, + {suite.allocID(), []uint32{2, 12}}, } for _, param := range params { @@ -252,7 +253,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { re := suite.Require() // Create the keyspace group `oldID` with keyspaces [111, 222, 333]. - oldID := suite.alloc() + oldID := suite.allocID() handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { @@ -281,7 +282,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { err = suite.tsoCluster.GetPrimaryServer(222, oldID).ResetTS(tsoutil.GenerateTS(&ts), false, true, oldID) re.NoError(err) // Split the keyspace group `oldID` to `newID`. - newID := suite.alloc() + newID := suite.allocID() handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, oldID, &handlers.SplitKeyspaceGroupByIDParams{ NewID: newID, Keyspaces: []uint32{222, 333}, @@ -316,7 +317,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO( func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection() { re := suite.Require() // Create the keyspace group `oldID` with keyspaces [111, 222, 333]. - oldID := suite.alloc() + oldID := suite.allocID() handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { @@ -332,7 +333,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) re.False(kg1.IsSplitting()) // Split the keyspace group `oldID` to `newID`. - newID := suite.alloc() + newID := suite.allocID() handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, oldID, &handlers.SplitKeyspaceGroupByIDParams{ NewID: newID, Keyspaces: []uint32{222, 333}, @@ -404,7 +405,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() // Enable the failpoint to slow down the system time to test whether the TSO is monotonic. re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/systemTimeSlow", `return(true)`)) // Create the keyspace group `oldID` with keyspaces [444, 555, 666]. - oldID := suite.alloc() + oldID := suite.allocID() handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { @@ -422,7 +423,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() // Request the TSO for keyspace 555 concurrently via client. cancel := suite.dispatchClient(re, 555, oldID) // Split the keyspace group `oldID` to `newID`. - newID := suite.alloc() + newID := suite.allocID() handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, oldID, &handlers.SplitKeyspaceGroupByIDParams{ NewID: newID, Keyspaces: []uint32{555, 666}, @@ -585,7 +586,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMerge() { re := suite.Require() // Create the keyspace group `firstID` and `secondID` with keyspaces [111, 222] and [333]. - firstID, secondID := suite.alloc(), suite.alloc() + firstID, secondID := suite.allocID(), suite.allocID() handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { @@ -641,7 +642,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMerge() { func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeClient() { re := suite.Require() // Create the keyspace group `id` with keyspaces [111, 222, 333]. - id := suite.alloc() + id := suite.allocID() handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { @@ -691,7 +692,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeBeforeIn // Make sure the TSO of keyspace group `id` won't be initialized before it's merged. re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/failedToSaveTimestamp", `return(true)`)) // Request the TSO for the default keyspace concurrently via client. - id := suite.alloc() + id := suite.allocID() cancel := suite.dispatchClient(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) // Create the keyspace group 1 with keyspaces [111, 222, 333]. handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ @@ -793,7 +794,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault keyspaces = make([]uint32, 0, keyspaceGroupNum) ) for i := 1; i <= keyspaceGroupNum; i++ { - id := suite.alloc() + id := suite.allocID() keyspaceGroups = append(keyspaceGroups, &endpoint.KeyspaceGroup{ ID: id, UserKind: endpoint.UserKind(rand.Intn(int(endpoint.UserKindCount))).String(), From 7731ee4df6098257322a339c102b59655055a19f Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 21 May 2024 17:22:17 +0800 Subject: [PATCH 4/6] pkg: reduce the allocation of observe (#8188) ref tikv/pd#7897 Signed-off-by: Ryan Leung --- pkg/statistics/region_collection.go | 124 +++++++++++++++++----------- 1 file changed, 78 insertions(+), 46 deletions(-) diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index e4c159cf22d..30197dd43ea 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -222,61 +222,93 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store // Better to make sure once any of these conditions changes, it will trigger the heartbeat `save_cache`. // Otherwise, the state may be out-of-date for a long time, which needs another way to apply the change ASAP. // For example, see `RegionStatsNeedUpdate` above to know how `OversizedRegion` and `UndersizedRegion` are updated. - conditions := map[RegionStatisticType]bool{ - MissPeer: len(peers) < desiredReplicas, - ExtraPeer: len(peers) > desiredReplicas, - DownPeer: len(downPeers) > 0, - PendingPeer: len(pendingPeers) > 0, - OfflinePeer: func() bool { - for _, store := range stores { - if store.IsRemoving() { - peer := region.GetStorePeer(store.GetID()) - if peer != nil { - return true - } - } + var conditions RegionStatisticType + if len(peers) < desiredReplicas { + conditions |= MissPeer + } + if len(peers) > desiredReplicas { + conditions |= ExtraPeer + } + if len(downPeers) > 0 { + conditions |= DownPeer + } + if len(pendingPeers) > 0 { + conditions |= PendingPeer + } + for _, store := range stores { + if store.IsRemoving() { + peer := region.GetStorePeer(store.GetID()) + if peer != nil { + conditions |= OfflinePeer + break } - return false - }(), - LearnerPeer: len(learners) > 0, - EmptyRegion: regionSize <= core.EmptyRegionApproximateSize, - OversizedRegion: region.IsOversized(regionMaxSize, regionMaxKeys), - UndersizedRegion: region.NeedMerge(maxMergeRegionSize, maxMergeRegionKeys), - WitnessLeader: leaderIsWitness, + } + } + if len(learners) > 0 { + conditions |= LearnerPeer + } + if regionSize <= core.EmptyRegionApproximateSize { + conditions |= EmptyRegion + } + if region.IsOversized(regionMaxSize, regionMaxKeys) { + conditions |= OversizedRegion + } + if region.NeedMerge(maxMergeRegionSize, maxMergeRegionKeys) { + conditions |= UndersizedRegion + } + if leaderIsWitness { + conditions |= WitnessLeader } // Check if the region meets any of the conditions and update the corresponding info. regionID := region.GetID() - for typ, c := range conditions { - if c { - info := r.stats[typ][regionID] - if typ == DownPeer { - if info == nil { - info = &RegionInfoWithTS{} - } - if info.(*RegionInfoWithTS).startDownPeerTS != 0 { - regionDownPeerDuration.Observe(float64(time.Now().Unix() - info.(*RegionInfoWithTS).startDownPeerTS)) + for i := 0; i < len(regionStatisticTypes); i++ { + condition := RegionStatisticType(1 << i) + if conditions&condition == 0 { + continue + } + info := r.stats[condition][regionID] + // The condition is met + switch condition { + case MissPeer: + if info == nil { + info = &RegionInfoWithTS{} + } + if len(voters) < desiredVoters { + if info.(*RegionInfoWithTS).startMissVoterPeerTS != 0 { + regionMissVoterPeerDuration.Observe(float64(time.Now().Unix() - info.(*RegionInfoWithTS).startMissVoterPeerTS)) } else { - info.(*RegionInfoWithTS).startDownPeerTS = time.Now().Unix() - logDownPeerWithNoDisconnectedStore(region, stores) - } - } else if typ == MissPeer { - if info == nil { - info = &RegionInfoWithTS{} - } - if len(voters) < desiredVoters { - if info.(*RegionInfoWithTS).startMissVoterPeerTS != 0 { - regionMissVoterPeerDuration.Observe(float64(time.Now().Unix() - info.(*RegionInfoWithTS).startMissVoterPeerTS)) - } else { - info.(*RegionInfoWithTS).startMissVoterPeerTS = time.Now().Unix() - } + info.(*RegionInfoWithTS).startMissVoterPeerTS = time.Now().Unix() } + } + case DownPeer: + if info == nil { + info = &RegionInfoWithTS{} + } + if info.(*RegionInfoWithTS).startDownPeerTS != 0 { + regionDownPeerDuration.Observe(float64(time.Now().Unix() - info.(*RegionInfoWithTS).startDownPeerTS)) } else { - info = struct{}{} + info.(*RegionInfoWithTS).startDownPeerTS = time.Now().Unix() + logDownPeerWithNoDisconnectedStore(region, stores) } - - r.stats[typ][regionID] = info - peerTypeIndex |= typ + case ExtraPeer: + fallthrough + case PendingPeer: + fallthrough + case OfflinePeer: + fallthrough + case LearnerPeer: + fallthrough + case EmptyRegion: + fallthrough + case OversizedRegion: + fallthrough + case UndersizedRegion: + fallthrough + case WitnessLeader: + info = struct{}{} } + r.stats[condition][regionID] = info + peerTypeIndex |= condition } // Remove the info if any of the conditions are not met any more. if oldIndex, ok := r.index[regionID]; ok && oldIndex > emptyStatistic { From d163cbeb560847536bb0a1ef6258811b03005cbf Mon Sep 17 00:00:00 2001 From: husharp Date: Tue, 21 May 2024 17:51:30 +0800 Subject: [PATCH 5/6] add patrol time Signed-off-by: husharp --- pkg/tso/keyspace_group_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 2930357e2b4..b2af48f08da 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -1439,7 +1439,7 @@ func (kgm *KeyspaceGroupManager) groupSplitPatroller() { defer kgm.wg.Done() patrolInterval := groupPatrolInterval failpoint.Inject("fastGroupSplitPatroller", func() { - patrolInterval = time.Second + patrolInterval = 3 * time.Second }) ticker := time.NewTicker(patrolInterval) defer ticker.Stop() From 58e7580209f001248c3d530ef2d315ab3c6fd767 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 21 May 2024 18:11:47 +0800 Subject: [PATCH 6/6] *: use a separate runner for updating subtree (#8158) ref tikv/pd#7897 Signed-off-by: Ryan Leung --- metrics/grafana/pd.json | 10 ++-- pkg/core/context.go | 2 + pkg/core/region.go | 10 ++-- pkg/core/region_test.go | 6 +-- pkg/mcs/scheduling/server/cluster.go | 45 ++++++++++-------- pkg/ratelimit/metrics.go | 32 +++++++++---- pkg/ratelimit/runner.go | 70 ++++++++++++++++------------ pkg/syncer/client.go | 2 +- server/cluster/cluster.go | 58 ++++++++++++----------- server/cluster/cluster_worker.go | 8 ++-- 10 files changed, 142 insertions(+), 101 deletions(-) diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index e6d314c2e00..54a047e612e 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -11651,12 +11651,12 @@ "targets": [ { "exemplar": true, - "expr": "pd_ratelimit_runner_task_pending_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", + "expr": "pd_ratelimit_runner_pending_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", "format": "time_series", "hide": false, "interval": "", "intervalFactor": 2, - "legendFormat": "{{task_type}}_({{runner_name}})", + "legendFormat": "{{task_type}}_{{runner_name}}", "refId": "A", "step": 4 } @@ -11768,12 +11768,12 @@ "targets": [ { "exemplar": true, - "expr": "rate(pd_ratelimit_runner_task_failed_tasks_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])*60", + "expr": "rate(pd_ratelimit_runner_failed_tasks_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])*60", "format": "time_series", "hide": false, "interval": "", "intervalFactor": 2, - "legendFormat": "failed-tasks-({{runner_name}})", + "legendFormat": "failed-tasks-{{runner_name}}", "refId": "A", "step": 4 }, @@ -11782,7 +11782,7 @@ "expr": "pd_ratelimit_runner_task_max_waiting_duration_seconds{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", "hide": false, "interval": "", - "legendFormat": "max-wait-duration-({{runner_name}})", + "legendFormat": "max-wait-duration-{{runner_name}}", "refId": "B" } ], diff --git a/pkg/core/context.go b/pkg/core/context.go index a0f51e55680..7410f8394c2 100644 --- a/pkg/core/context.go +++ b/pkg/core/context.go @@ -25,6 +25,7 @@ type MetaProcessContext struct { context.Context Tracer RegionHeartbeatProcessTracer TaskRunner ratelimit.Runner + MiscRunner ratelimit.Runner LogRunner ratelimit.Runner } @@ -35,6 +36,7 @@ func ContextTODO() *MetaProcessContext { Context: context.TODO(), Tracer: NewNoopHeartbeatProcessTracer(), TaskRunner: ratelimit.NewSyncRunner(), + MiscRunner: ratelimit.NewSyncRunner(), LogRunner: ratelimit.NewSyncRunner(), // Limit default is nil } diff --git a/pkg/core/region.go b/pkg/core/region.go index c9a8455d4de..a1a61d505a9 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -729,7 +729,7 @@ func (r *RegionInfo) isRegionRecreated() bool { // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. -type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) +type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool) // GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. // nil means do not print the log. @@ -742,7 +742,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. - return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) { + return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool) { logRunner := ctx.LogRunner // print log asynchronously debug, info := d, i @@ -772,7 +772,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("region-id", region.GetID()), logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta()))) } - saveKV, saveCache = true, true + saveKV, saveCache, retained = true, true, true } else { r := region.GetRegionEpoch() o := origin.GetRegionEpoch() @@ -785,7 +785,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-version", r.GetVersion()), ) } - saveKV, saveCache = true, true + saveKV, saveCache, retained = true, true, true } if r.GetConfVer() > o.GetConfVer() { if log.GetLevel() <= zap.InfoLevel { @@ -796,7 +796,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-confver", r.GetConfVer()), ) } - saveKV, saveCache = true, true + saveKV, saveCache, retained = true, true, true } if region.GetLeader().GetId() != origin.GetLeader().GetId() { if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel { diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 1b8f20cf9b2..b09c1dfd601 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -363,7 +363,7 @@ func TestNeedSync(t *testing.T) { for _, testCase := range testCases { regionA := region.Clone(testCase.optionsA...) regionB := region.Clone(testCase.optionsB...) - _, _, needSync := RegionGuide(ContextTODO(), regionA, regionB) + _, _, needSync, _ := RegionGuide(ContextTODO(), regionA, regionB) re.Equal(testCase.needSync, needSync) } } @@ -1031,7 +1031,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) { regionsOld.AtomicCheckAndPutRegion(ctx, regionPendingItemA) re.Equal(int32(2), regionPendingItemA.GetRef()) // check new item - saveKV, saveCache, needSync := regionGuide(ctx, regionItemA, regionPendingItemA) + saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemA, regionPendingItemA) re.True(needSync) re.True(saveCache) re.False(saveKV) @@ -1060,7 +1060,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) { re.Equal(int32(1), regionPendingItemB.GetRef()) // heartbeat again, no need updates root tree - saveKV, saveCache, needSync := regionGuide(ctx, regionItemB, regionItemB) + saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemB, regionItemB) re.False(needSync) re.False(saveCache) re.False(saveKV) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index d3691516868..c6c365b03ad 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -54,8 +54,12 @@ type Cluster struct { clusterID uint64 running atomic.Bool - heartbeatRunnner ratelimit.Runner - logRunner ratelimit.Runner + // heartbeatRunner is used to process the subtree update task asynchronously. + heartbeatRunner ratelimit.Runner + // miscRunner is used to process the statistics and persistent tasks asynchronously. + miscRunner ratelimit.Runner + // logRunner is used to process the log asynchronously. + logRunner ratelimit.Runner } const ( @@ -64,8 +68,9 @@ const ( collectWaitTime = time.Minute // heartbeat relative const - heartbeatTaskRunner = "heartbeat-task-runner" - logTaskRunner = "log-task-runner" + heartbeatTaskRunner = "heartbeat-task-runner" + statisticsTaskRunner = "statistics-task-runner" + logTaskRunner = "log-task-runner" ) var syncRunner = ratelimit.NewSyncRunner() @@ -93,8 +98,9 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, clusterID: clusterID, checkMembershipCh: checkMembershipCh, - heartbeatRunnner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), - logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + miscRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) @@ -531,7 +537,8 @@ func (c *Cluster) StartBackgroundJobs() { go c.runUpdateStoreStats() go c.runCoordinator() go c.runMetricsCollectionJob() - c.heartbeatRunnner.Start() + c.heartbeatRunner.Start() + c.miscRunner.Start() c.logRunner.Start() c.running.Store(true) } @@ -543,7 +550,8 @@ func (c *Cluster) StopBackgroundJobs() { } c.running.Store(false) c.coordinator.Stop() - c.heartbeatRunnner.Stop() + c.heartbeatRunner.Stop() + c.miscRunner.Stop() c.logRunner.Stop() c.cancel() c.wg.Wait() @@ -560,16 +568,18 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics { tracer = core.NewHeartbeatProcessTracer() } - var taskRunner, logRunner ratelimit.Runner - taskRunner, logRunner = syncRunner, syncRunner + var taskRunner, miscRunner, logRunner ratelimit.Runner + taskRunner, miscRunner, logRunner = syncRunner, syncRunner, syncRunner if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner { - taskRunner = c.heartbeatRunnner + taskRunner = c.heartbeatRunner + miscRunner = c.miscRunner logRunner = c.logRunner } ctx := &core.MetaProcessContext{ Context: c.ctx, Tracer: tracer, TaskRunner: taskRunner, + MiscRunner: miscRunner, LogRunner: logRunner, } tracer.Begin() @@ -591,19 +601,12 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c return err } region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) - - ctx.TaskRunner.RunTask( - ctx, - ratelimit.HandleStatsAsync, - func(_ context.Context) { - cluster.HandleStatsAsync(c, region) - }, - ) + cluster.HandleStatsAsync(c, region) tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - _, saveCache, _ := core.GenerateRegionGuideFunc(true)(ctx, region, origin) + _, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin) if !saveCache { // Due to some config changes need to update the region stats as well, @@ -627,6 +630,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithRetained(true), ) } return nil @@ -650,6 +654,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithRetained(retained), ) tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( diff --git a/pkg/ratelimit/metrics.go b/pkg/ratelimit/metrics.go index 5d4443a1cc4..c5510e66b26 100644 --- a/pkg/ratelimit/metrics.go +++ b/pkg/ratelimit/metrics.go @@ -31,25 +31,41 @@ var ( Name: "runner_task_max_waiting_duration_seconds", Help: "The duration of tasks waiting in the runner.", }, []string{nameStr}) - - RunnerTaskPendingTasks = prometheus.NewGaugeVec( + RunnerPendingTasks = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", Subsystem: "ratelimit", - Name: "runner_task_pending_tasks", + Name: "runner_pending_tasks", Help: "The number of pending tasks in the runner.", }, []string{nameStr, taskStr}) - RunnerTaskFailedTasks = prometheus.NewCounterVec( + RunnerFailedTasks = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", Subsystem: "ratelimit", - Name: "runner_task_failed_tasks_total", + Name: "runner_failed_tasks_total", Help: "The number of failed tasks in the runner.", - }, []string{nameStr}) + }, []string{nameStr, taskStr}) + RunnerSucceededTasks = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_success_tasks_total", + Help: "The number of tasks in the runner.", + }, []string{nameStr, taskStr}) + RunnerTaskExecutionDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_task_execution_duration_seconds", + Help: "Bucketed histogram of processing time (s) of finished tasks.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), + }, []string{nameStr, taskStr}) ) func init() { prometheus.MustRegister(RunnerTaskMaxWaitingDuration) - prometheus.MustRegister(RunnerTaskPendingTasks) - prometheus.MustRegister(RunnerTaskFailedTasks) + prometheus.MustRegister(RunnerPendingTasks) + prometheus.MustRegister(RunnerFailedTasks) + prometheus.MustRegister(RunnerTaskExecutionDuration) + prometheus.MustRegister(RunnerSucceededTasks) } diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 07233af238b..17a45067f3d 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -35,7 +35,10 @@ const ( SaveRegionToKV = "SaveRegionToKV" ) -const initialCapacity = 100 +const ( + initialCapacity = 10000 + maxPendingTaskNum = 20000000 +) // Runner is the interface for running tasks. type Runner interface { @@ -48,9 +51,10 @@ type Runner interface { type Task struct { ctx context.Context submittedAt time.Time - opts *TaskOpts f func(context.Context) name string + // retained indicates whether the task should be dropped if the task queue exceeds maxPendingDuration. + retained bool } // ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum. @@ -67,7 +71,6 @@ type ConcurrentRunner struct { stopChan chan struct{} wg sync.WaitGroup pendingTaskCount map[string]int64 - failedTaskCount prometheus.Counter maxWaitingDuration prometheus.Gauge } @@ -79,18 +82,19 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur maxPendingDuration: maxPendingDuration, taskChan: make(chan *Task), pendingTasks: make([]*Task, 0, initialCapacity), - failedTaskCount: RunnerTaskFailedTasks.WithLabelValues(name), pendingTaskCount: make(map[string]int64), maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s } -// TaskOpts is the options for RunTask. -type TaskOpts struct{} - // TaskOption configures TaskOp -type TaskOption func(opts *TaskOpts) +type TaskOption func(opts *Task) + +// WithRetained sets whether the task should be retained. +func WithRetained(retained bool) TaskOption { + return func(opts *Task) { opts.retained = retained } +} // Start starts the runner. func (cr *ConcurrentRunner) Start() { @@ -123,8 +127,8 @@ func (cr *ConcurrentRunner) Start() { if len(cr.pendingTasks) > 0 { maxDuration = time.Since(cr.pendingTasks[0].submittedAt) } - for name, cnt := range cr.pendingTaskCount { - RunnerTaskPendingTasks.WithLabelValues(cr.name, name).Set(float64(cnt)) + for taskName, cnt := range cr.pendingTaskCount { + RunnerPendingTasks.WithLabelValues(cr.name, taskName).Set(float64(cnt)) } cr.pendingMu.Unlock() cr.maxWaitingDuration.Set(maxDuration.Seconds()) @@ -134,26 +138,28 @@ func (cr *ConcurrentRunner) Start() { } func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) { + start := time.Now() task.f(task.ctx) if token != nil { cr.limiter.ReleaseToken(token) cr.processPendingTasks() } + RunnerTaskExecutionDuration.WithLabelValues(cr.name, task.name).Observe(time.Since(start).Seconds()) + RunnerSucceededTasks.WithLabelValues(cr.name, task.name).Inc() } func (cr *ConcurrentRunner) processPendingTasks() { cr.pendingMu.Lock() defer cr.pendingMu.Unlock() - for len(cr.pendingTasks) > 0 { + if len(cr.pendingTasks) > 0 { task := cr.pendingTasks[0] select { case cr.taskChan <- task: cr.pendingTasks = cr.pendingTasks[1:] cr.pendingTaskCount[task.name]-- - return default: - return } + return } } @@ -165,34 +171,40 @@ func (cr *ConcurrentRunner) Stop() { // RunTask runs the task asynchronously. func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error { - taskOpts := &TaskOpts{} - for _, opt := range opts { - opt(taskOpts) - } task := &Task{ ctx: ctx, name: name, f: f, - opts: taskOpts, } - + for _, opt := range opts { + opt(task) + } cr.processPendingTasks() - select { - case cr.taskChan <- task: - default: - cr.pendingMu.Lock() - defer cr.pendingMu.Unlock() - if len(cr.pendingTasks) > 0 { + cr.pendingMu.Lock() + defer func() { + cr.pendingMu.Unlock() + cr.processPendingTasks() + }() + + pendingTaskNum := len(cr.pendingTasks) + if pendingTaskNum > 0 { + if !task.retained { maxWait := time.Since(cr.pendingTasks[0].submittedAt) if maxWait > cr.maxPendingDuration { - cr.failedTaskCount.Inc() + RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() return ErrMaxWaitingTasksExceeded } } - task.submittedAt = time.Now() - cr.pendingTasks = append(cr.pendingTasks, task) - cr.pendingTaskCount[task.name]++ + // We use the max task number to limit the memory usage. + // It occupies around 1.5GB memory when there is 20000000 pending task. + if len(cr.pendingTasks) > maxPendingTaskNum { + RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() + return ErrMaxWaitingTasksExceeded + } } + task.submittedAt = time.Now() + cr.pendingTasks = append(cr.pendingTasks, task) + cr.pendingTaskCount[task.name]++ return nil } diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index 8a2e757d5cd..00fa8dc389b 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -212,7 +212,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { Tracer: core.NewNoopHeartbeatProcessTracer(), // no limit for followers. } - saveKV, _, _ := regionGuide(ctx, region, origin) + saveKV, _, _, _ := regionGuide(ctx, region, origin) overlaps := bc.PutRegion(region) if hasBuckets { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index a8558051dfa..148b43541a2 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -107,8 +107,9 @@ const ( minSnapshotDurationSec = 5 // heartbeat relative const - heartbeatTaskRunner = "heartbeat-async" - logTaskRunner = "log-async" + heartbeatTaskRunner = "heartbeat-async" + statisticsTaskRunner = "statistics-async" + logTaskRunner = "log-async" ) // Server is the interface for cluster. @@ -173,8 +174,12 @@ type RaftCluster struct { independentServices sync.Map hbstreams *hbstream.HeartbeatStreams - heartbeatRunnner ratelimit.Runner - logRunner ratelimit.Runner + // heartbeatRunner is used to process the subtree update task asynchronously. + heartbeatRunner ratelimit.Runner + // miscRunner is used to process the statistics and persistent tasks asynchronously. + miscRunner ratelimit.Runner + // logRunner is used to process the log asynchronously. + logRunner ratelimit.Runner } // Status saves some state information. @@ -191,15 +196,16 @@ type Status struct { func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, httpClient *http.Client) *RaftCluster { return &RaftCluster{ - serverCtx: ctx, - clusterID: clusterID, - regionSyncer: regionSyncer, - httpClient: httpClient, - etcdClient: etcdClient, - core: basicCluster, - storage: storage, - heartbeatRunnner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), - logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + serverCtx: ctx, + clusterID: clusterID, + regionSyncer: regionSyncer, + httpClient: httpClient, + etcdClient: etcdClient, + core: basicCluster, + storage: storage, + heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + miscRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } } @@ -357,7 +363,8 @@ func (c *RaftCluster) Start(s Server) error { go c.startGCTuner() c.running = true - c.heartbeatRunnner.Start() + c.heartbeatRunner.Start() + c.miscRunner.Start() c.logRunner.Start() return nil } @@ -752,7 +759,8 @@ func (c *RaftCluster) Stop() { if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { c.stopSchedulingJobs() } - c.heartbeatRunnner.Stop() + c.heartbeatRunner.Stop() + c.miscRunner.Stop() c.logRunner.Stop() c.Unlock() @@ -1024,19 +1032,13 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - ctx.TaskRunner.RunTask( - ctx.Context, - ratelimit.HandleStatsAsync, - func(_ context.Context) { - cluster.HandleStatsAsync(c, region) - }, - ) + cluster.HandleStatsAsync(c, region) } tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - saveKV, saveCache, needSync := regionGuide(ctx, region, origin) + saveKV, saveCache, needSync, retained := regionGuide(ctx, region, origin) tracer.OnRegionGuideFinished() if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, @@ -1045,7 +1047,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { - ctx.TaskRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.ObserveRegionStatsAsync, func(_ context.Context) { @@ -1063,6 +1065,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithRetained(true), ) } return nil @@ -1090,11 +1093,12 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithRetained(retained), ) tracer.OnUpdateSubTreeFinished() if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - ctx.TaskRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.HandleOverlaps, func(_ context.Context) { @@ -1107,7 +1111,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnSaveCacheFinished() // handle region stats - ctx.TaskRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.CollectRegionStatsAsync, func(_ context.Context) { @@ -1121,7 +1125,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnCollectRegionStatsFinished() if c.storage != nil { if saveKV { - ctx.TaskRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.SaveRegionToKV, func(_ context.Context) { diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 43602dbb68d..39720e7d765 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -40,10 +40,11 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { tracer = core.NewHeartbeatProcessTracer() } defer tracer.Release() - var taskRunner, logRunner ratelimit.Runner - taskRunner, logRunner = syncRunner, syncRunner + var taskRunner, miscRunner, logRunner ratelimit.Runner + taskRunner, miscRunner, logRunner = syncRunner, syncRunner, syncRunner if c.GetScheduleConfig().EnableHeartbeatConcurrentRunner { - taskRunner = c.heartbeatRunnner + taskRunner = c.heartbeatRunner + miscRunner = c.miscRunner logRunner = c.logRunner } @@ -51,6 +52,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { Context: c.ctx, Tracer: tracer, TaskRunner: taskRunner, + MiscRunner: miscRunner, LogRunner: logRunner, } tracer.Begin()