diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index f72f0ff817b..ed6a3ee501c 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -129,20 +129,6 @@ func (suite *resourceManagerClientTestSuite) SetupSuite() { }, }, }, - { - Name: "background_job", - Mode: rmpb.GroupMode_RUMode, - RUSettings: &rmpb.GroupRequestUnitSettings{ - RU: &rmpb.TokenBucket{ - Settings: &rmpb.TokenLimitSettings{ - FillRate: 10000, - BurstLimit: -1, - }, - Tokens: 100000, - }, - }, - BackgroundSettings: &rmpb.BackgroundSettings{JobTypes: []string{"br", "lightning"}}, - }, } } @@ -191,19 +177,17 @@ func (suite *resourceManagerClientTestSuite) resignAndWaitLeader() { } func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { - // TODO: fix the unstable part at line 248. - suite.T().Skip() re := suite.Require() cli := suite.client + groupNamePrefix := "watch_test" group := &rmpb.ResourceGroup{ - Name: "test", + Name: groupNamePrefix, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ Settings: &rmpb.TokenLimitSettings{ FillRate: 10000, }, - Tokens: 100000, }, }, } @@ -215,7 +199,7 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { var meta *rmpb.ResourceGroup groupsNum := 10 for i := 0; i < groupsNum; i++ { - group.Name = "test" + strconv.Itoa(i) + group.Name = groupNamePrefix + strconv.Itoa(i) resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") @@ -228,25 +212,25 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { re.NotNil(meta) } // Mock modify resource groups - modifySettings := func(gs *rmpb.ResourceGroup) { + modifySettings := func(gs *rmpb.ResourceGroup, fillRate uint64) { gs.RUSettings = &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ Settings: &rmpb.TokenLimitSettings{ - FillRate: 20000, + FillRate: fillRate, }, }, } } for i := 0; i < groupsNum; i++ { - group.Name = "test" + strconv.Itoa(i) - modifySettings(group) + group.Name = groupNamePrefix + strconv.Itoa(i) + modifySettings(group, 20000) resp, err := cli.ModifyResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") } for i := 0; i < groupsNum; i++ { testutil.Eventually(re, func() bool { - name := "test" + strconv.Itoa(i) + name := groupNamePrefix + strconv.Itoa(i) meta = controller.GetActiveResourceGroup(name) if meta != nil { return meta.RUSettings.RU.Settings.FillRate == uint64(20000) @@ -257,7 +241,7 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { // Mock reset watch stream re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/watchStreamError", "return(true)")) - group.Name = "test" + strconv.Itoa(groupsNum) + group.Name = groupNamePrefix + strconv.Itoa(groupsNum) resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") @@ -265,13 +249,13 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { meta, err = controller.GetResourceGroup(group.Name) re.NotNil(meta) re.NoError(err) - modifySettings(group) + modifySettings(group, 30000) resp, err = cli.ModifyResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") testutil.Eventually(re, func() bool { meta = controller.GetActiveResourceGroup(group.Name) - return meta.RUSettings.RU.Settings.FillRate == uint64(20000) + return meta.RUSettings.RU.Settings.FillRate == uint64(30000) }, testutil.WithTickInterval(100*time.Millisecond)) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/watchStreamError")) @@ -279,7 +263,7 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { suite.cleanupResourceGroups() for i := 0; i < groupsNum; i++ { testutil.Eventually(re, func() bool { - name := "test" + strconv.Itoa(i) + name := groupNamePrefix + strconv.Itoa(i) meta = controller.GetActiveResourceGroup(name) return meta == nil }, testutil.WithTickInterval(50*time.Millisecond)) @@ -305,7 +289,7 @@ func (suite *resourceManagerClientTestSuite) TestWatchWithSingleGroupByKeyspace( // Mock add resource group. group := &rmpb.ResourceGroup{ - Name: "test", + Name: "keyspace_test", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -328,6 +312,28 @@ func (suite *resourceManagerClientTestSuite) TestWatchWithSingleGroupByKeyspace( controllerKeySpace.OnRequestWait(suite.ctx, group.Name, tcs.makeReadRequest()) metaKeySpace := controllerKeySpace.GetActiveResourceGroup(group.Name) re.Equal(metaKeySpace.RUSettings.RU, group.RUSettings.RU) + + // Mock modify resource groups + modifySettings := func(gs *rmpb.ResourceGroup, fillRate uint64) { + gs.RUSettings = &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: fillRate, + }, + }, + } + } + modifySettings(group, 20000) + resp, err = cli.ModifyResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + + testutil.Eventually(re, func() bool { + meta = controller.GetActiveResourceGroup(group.Name) + return meta.RUSettings.RU.Settings.FillRate == uint64(20000) + }, testutil.WithTickInterval(100*time.Millisecond)) + metaKeySpace = controllerKeySpace.GetActiveResourceGroup(group.Name) + re.Equal(metaKeySpace.RUSettings.RU.Settings.FillRate, uint64(10000)) } const buffDuration = time.Millisecond * 300 @@ -367,11 +373,21 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { - resp, err := cli.AddResourceGroup(suite.ctx, group) - re.NoError(err) - re.Contains(resp, "Success!") + rg := &rmpb.ResourceGroup{ + Name: "controller_test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 10000, + }, + Tokens: 100000, + }, + }, } + resp, err := cli.AddResourceGroup(suite.ctx, rg) + re.NoError(err) + re.Contains(resp, "Success!") cfg := &controller.RequestUnitConfig{ ReadBaseCost: 1, @@ -390,7 +406,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { len int }{ { - resourceGroupName: suite.initGroups[0].Name, + resourceGroupName: rg.Name, len: 8, tcs: []tokenConsumptionPerSecond{ {rruTokensAtATime: 50, wruTokensAtATime: 20, times: 100, waitDuration: 0}, @@ -441,7 +457,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate", "return(true)")) tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 900000000, times: 1, waitDuration: 0} wreq := tcs.makeWriteRequest() - _, _, err := controller.OnRequestWait(suite.ctx, suite.initGroups[0].Name, wreq) + _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq) re.Error(err) time.Sleep(time.Millisecond * 200) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate")) @@ -584,7 +600,22 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { + groupNames := []string{"penalty_test1", "penalty_test2"} + // Mock add 2 resource groups. + group := &rmpb.ResourceGroup{ + Name: groupNames[0], + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 10000, + }, + Tokens: 100000, + }, + }, + } + for _, name := range groupNames { + group.Name = name resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") @@ -600,7 +631,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace()) c.Start(suite.ctx) - resourceGroupName := suite.initGroups[1].Name + resourceGroupName := groupNames[0] // init req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */) resp := controller.NewTestResponseInfo(0, time.Duration(30), true) @@ -659,7 +690,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { re.NoError(err) // from different group, should be zero - resourceGroupName = suite.initGroups[2].Name + resourceGroupName = groupNames[1] req4 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */) resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true) _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4) @@ -676,8 +707,7 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { re := suite.Require() cli := suite.client - groups := make([]*rmpb.ResourceGroup, 0) - groups = append(groups, suite.initGroups...) + groups := suite.initGroups for _, group := range groups { resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) @@ -689,7 +719,6 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { } re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/resourcemanager/server/fastPersist", `return(true)`)) suite.resignAndWaitLeader() - groups = append(groups, &rmpb.ResourceGroup{Name: "test3"}) for i := 0; i < 3; i++ { for _, group := range groups { requests := make([]*rmpb.RequestUnitItem, 0) @@ -762,8 +791,8 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { expectMarshal string modifySettings func(*rmpb.ResourceGroup) }{ - {"test1", rmpb.GroupMode_RUMode, true, true, - `{"name":"test1","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":10000},"state":{"initialized":false}}},"priority":0}`, + {"CRUD_test1", rmpb.GroupMode_RUMode, true, true, + `{"name":"CRUD_test1","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":10000},"state":{"initialized":false}}},"priority":0}`, func(gs *rmpb.ResourceGroup) { gs.RUSettings = &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -775,8 +804,8 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { }, }, - {"test2", rmpb.GroupMode_RUMode, true, true, - `{"name":"test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":20000},"state":{"initialized":false}}},"priority":0,"runaway_settings":{"rule":{"exec_elapsed_time_ms":10000},"action":2},"background_settings":{"job_types":["test"]}}`, + {"CRUD_test2", rmpb.GroupMode_RUMode, true, true, + `{"name":"CRUD_test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":20000},"state":{"initialized":false}}},"priority":0,"runaway_settings":{"rule":{"exec_elapsed_time_ms":10000},"action":2},"background_settings":{"job_types":["test"]}}`, func(gs *rmpb.ResourceGroup) { gs.RUSettings = &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -796,8 +825,8 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { } }, }, - {"test2", rmpb.GroupMode_RUMode, false, true, - `{"name":"test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":30000,"burst_limit":-1},"state":{"initialized":false}}},"priority":0,"runaway_settings":{"rule":{"exec_elapsed_time_ms":1000},"action":3,"watch":{"lasting_duration_ms":100000,"type":2}},"background_settings":{"job_types":["br","lightning"]}}`, + {"CRUD_test2", rmpb.GroupMode_RUMode, false, true, + `{"name":"CRUD_test2","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":30000,"burst_limit":-1},"state":{"initialized":false}}},"priority":0,"runaway_settings":{"rule":{"exec_elapsed_time_ms":1000},"action":3,"watch":{"lasting_duration_ms":100000,"type":2}},"background_settings":{"job_types":["br","lightning"]}}`, func(gs *rmpb.ResourceGroup) { gs.RUSettings = &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -1033,15 +1062,10 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientFailover() cli := suite.client group := &rmpb.ResourceGroup{ - Name: "test3", + Name: "failover_test", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ - RU: &rmpb.TokenBucket{ - Settings: &rmpb.TokenLimitSettings{ - FillRate: 10000, - }, - Tokens: 100000, - }, + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, }, } addResp, err := cli.AddResourceGroup(suite.ctx, group) @@ -1070,8 +1094,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo re := suite.Require() cli := suite.client + groupName := "mode_test" group := &rmpb.ResourceGroup{ - Name: "modetest", + Name: groupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{ @@ -1106,16 +1131,15 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo rruTokensAtATime: 0, wruTokensAtATime: 2, } - resourceName := "modetest" - controller.OnRequestWait(suite.ctx, resourceName, tc.makeWriteRequest()) + controller.OnRequestWait(suite.ctx, groupName, tc.makeWriteRequest()) time.Sleep(time.Second * 2) beginTime := time.Now() // This is used to make sure resource group in lowRU. for i := 0; i < 100; i++ { - controller.OnRequestWait(suite.ctx, resourceName, tc2.makeWriteRequest()) + controller.OnRequestWait(suite.ctx, groupName, tc2.makeWriteRequest()) } for i := 0; i < 100; i++ { - controller.OnRequestWait(suite.ctx, resourceName, tc.makeWriteRequest()) + controller.OnRequestWait(suite.ctx, groupName, tc.makeWriteRequest()) } endTime := time.Now() // we can not check `inDegradedMode` because of data race. @@ -1167,18 +1191,20 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { - resp, err := cli.AddResourceGroup(suite.ctx, group) - re.NoError(err) - re.Contains(resp, "Success!") + // Mock add resource group. + group := &rmpb.ResourceGroup{ + Name: "stale_test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, + }, } + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") - ruConfig := &controller.RequestUnitConfig{ - ReadBaseCost: 1, - ReadCostPerByte: 1, - } re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/fastCleanup", `return(true)`)) - controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, ruConfig) + controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) controller.Start(suite.ctx) testConfig := struct { @@ -1194,13 +1220,13 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { rreq := testConfig.tcs.makeReadRequest() rres := testConfig.tcs.makeReadResponse() for j := 0; j < testConfig.times; j++ { - controller.OnRequestWait(suite.ctx, suite.initGroups[0].Name, rreq) - controller.OnResponse(suite.initGroups[0].Name, rreq, rres) + controller.OnRequestWait(suite.ctx, group.Name, rreq) + controller.OnResponse(group.Name, rreq, rres) time.Sleep(100 * time.Microsecond) } time.Sleep(1 * time.Second) - re.Nil(controller.GetActiveResourceGroup(suite.initGroups[0].Name)) + re.Nil(controller.GetActiveResourceGroup(group.Name)) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/fastCleanup")) controller.Stop() @@ -1210,47 +1236,52 @@ func (suite *resourceManagerClientTestSuite) TestCheckBackgroundJobs() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { - resp, err := cli.AddResourceGroup(suite.ctx, group) - re.NoError(err) - re.Contains(resp, "Success!") + enableBackgroundGroup := func(enable bool) string { + if enable { + return "background_enable" + } else { + return "background_unable" + } } - - cfg := &controller.RequestUnitConfig{ - ReadBaseCost: 1, - ReadCostPerByte: 1, - WriteBaseCost: 1, - WriteCostPerByte: 1, - CPUMsCost: 1, + // Mock add resource group. + group := &rmpb.ResourceGroup{ + Name: enableBackgroundGroup(false), + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, + }, } - c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg) + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + group.Name = enableBackgroundGroup(true) + group.BackgroundSettings = &rmpb.BackgroundSettings{JobTypes: []string{"br", "lightning"}} + resp, err = cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + + c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) c.Start(suite.ctx) - resourceGroupName := suite.initGroups[0].Name + resourceGroupName := enableBackgroundGroup(false) re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_default")) // test fallback for nil. re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_lightning")) re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_ddl")) re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "")) - resourceGroupName = "background_job" + resourceGroupName = enableBackgroundGroup(true) re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_br")) re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_lightning")) // test fallback for nil. re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_ddl")) // modify `Default` to check fallback. - resp, err := cli.ModifyResourceGroup(suite.ctx, &rmpb.ResourceGroup{ + resp, err = cli.ModifyResourceGroup(suite.ctx, &rmpb.ResourceGroup{ Name: "default", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ - RU: &rmpb.TokenBucket{ - Settings: &rmpb.TokenLimitSettings{ - FillRate: 1, - BurstLimit: -1, - }, - Tokens: 1, - }, + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, }, BackgroundSettings: &rmpb.BackgroundSettings{JobTypes: []string{"lightning", "ddl"}}, }) @@ -1265,14 +1296,14 @@ func (suite *resourceManagerClientTestSuite) TestCheckBackgroundJobs() { return false }, testutil.WithTickInterval(50*time.Millisecond)) - resourceGroupName = suite.initGroups[0].Name + resourceGroupName = enableBackgroundGroup(false) re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_default")) // test fallback for `"lightning", "ddl"`. re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_lightning")) re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_ddl")) re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "")) - resourceGroupName = "background_job" + resourceGroupName = enableBackgroundGroup(true) re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_br")) re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_lightning")) // test fallback for `"lightning", "ddl"`. @@ -1284,11 +1315,18 @@ func (suite *resourceManagerClientTestSuite) TestCheckBackgroundJobs() { func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigChanged() { re := suite.Require() cli := suite.client - for _, group := range suite.initGroups { - resp, err := cli.AddResourceGroup(suite.ctx, group) - re.NoError(err) - re.Contains(resp, "Success!") + // Mock add resource group. + group := &rmpb.ResourceGroup{ + Name: "config_change_test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, + }, } + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + c1, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) re.NoError(err) c1.Start(suite.ctx)