diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 46d8827c2b6b..a62a26159ae5 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -462,9 +462,7 @@ func (c *ResourceGroupsController) OnRequestWait( failedRequestCounter.WithLabelValues(resourceGroupName).Inc() return nil, nil, err } - jobTypes := gc.meta.BackgroundSettings.JobTypes - if JobTypesContains(jobTypes, info.RequestSource()) { - // If the resource group has background jobs, we should not wait for it. + if gc.isBackgroundRequest(info.RequestSource()) { return nil, nil, nil } return gc.onRequestWait(ctx, info) @@ -479,9 +477,7 @@ func (c *ResourceGroupsController) OnResponse( log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName)) return &rmpb.Consumption{}, nil } - jobTypes := tmp.(*groupCostController).meta.BackgroundSettings.JobTypes - if JobTypesContains(jobTypes, req.RequestSource()) { - // If the resource group has background jobs, we should not wait for it. + if tmp.(*groupCostController).isBackgroundRequest(req.RequestSource()) { return nil, nil } return tmp.(*groupCostController).onResponse(req, resp) @@ -1059,6 +1055,22 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType return req } +// If the resource group has background jobs, we should not record consumption and wait for it. +func (gc *groupCostController) isBackgroundRequest(requestResource string) bool { + gc.metaLock.Lock() + defer gc.metaLock.Unlock() + + if gc.meta.BackgroundSettings != nil { + jobTypes := gc.meta.BackgroundSettings.JobTypes + for _, jobType := range jobTypes { + if strings.Contains(requestResource, jobType) { + return true + } + } + } + return false +} + func (gc *groupCostController) getMeta() *rmpb.ResourceGroup { gc.metaLock.Lock() defer gc.metaLock.Unlock() @@ -1215,13 +1227,3 @@ func (gc *groupCostController) getKVCalculator() *KVCalculator { } return nil } - -// JobTypesContains returns true if the given value contains the slice. -func JobTypesContains(slice []string, value string) bool { - for _, v := range slice { - if strings.Contains(value, v) { - return true - } - } - return false -} diff --git a/client/resource_group/controller/testutil.go b/client/resource_group/controller/testutil.go index f120f81420c9..2d622e59bc02 100644 --- a/client/resource_group/controller/testutil.go +++ b/client/resource_group/controller/testutil.go @@ -62,6 +62,11 @@ func (tri *TestRequestInfo) RequestSource() string { return tri.requestSource } +// SetRequestSource set the request source. +func (tri *TestRequestInfo) SetRequestSource(requestSource string) { + tri.requestSource = requestSource +} + // TestResponseInfo is used to test the response info interface. type TestResponseInfo struct { readBytes uint64 diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 064d0bfe80e4..67880df053b6 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -128,6 +128,20 @@ func (suite *resourceManagerClientTestSuite) SetupSuite() { }, }, }, + { + Name: "background_job", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 10000, + BurstLimit: -1, + }, + Tokens: 100000, + }, + }, + BackgroundSettings: &rmpb.BackgroundSettings{JobTypes: []string{"br", "lightning"}}, + }, } } @@ -576,7 +590,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { resp := controller.NewTestResponseInfo(0, time.Duration(30), true) _, penalty, err := c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) - re.Equal(penalty.WriteBytes, float64(0)) + re.Equal(penalty.WriteBytes, 0.0) re.Equal(penalty.TotalCpuTimeMs, 0.0) _, err = c.OnResponse(resourceGroupName, req, resp) re.NoError(err) @@ -585,7 +599,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { resp = controller.NewTestResponseInfo(0, time.Duration(10), true) _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) - re.Equal(penalty.WriteBytes, float64(0)) + re.Equal(penalty.WriteBytes, 0.0) re.Equal(penalty.TotalCpuTimeMs, 0.0) _, err = c.OnResponse(resourceGroupName, req, resp) re.NoError(err) @@ -595,7 +609,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { resp = controller.NewTestResponseInfo(0, time.Duration(0), false) _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) re.NoError(err) - re.Equal(penalty.WriteBytes, float64(0)) + re.Equal(penalty.WriteBytes, 0.0) re.Equal(penalty.TotalCpuTimeMs, 0.0) _, err = c.OnResponse(resourceGroupName, req, resp) re.NoError(err) @@ -605,7 +619,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { resp1 := controller.NewTestResponseInfo(0, time.Duration(10), true) _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1) re.NoError(err) - re.Equal(penalty.WriteBytes, float64(0)) + re.Equal(penalty.WriteBytes, 0.0) _, err = c.OnResponse(resourceGroupName, req1, resp1) re.NoError(err) @@ -614,7 +628,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { resp2 := controller.NewTestResponseInfo(0, time.Duration(10), true) _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2) re.NoError(err) - re.Equal(penalty.WriteBytes, float64(60)) + re.Equal(penalty.WriteBytes, 60.0) re.InEpsilon(penalty.TotalCpuTimeMs, 10.0/1000.0/1000.0, 1e-6) _, err = c.OnResponse(resourceGroupName, req2, resp2) re.NoError(err) @@ -624,7 +638,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { resp3 := controller.NewTestResponseInfo(0, time.Duration(10), true) _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3) re.NoError(err) - re.Equal(penalty.WriteBytes, float64(0)) + re.Equal(penalty.WriteBytes, 0.0) _, err = c.OnResponse(resourceGroupName, req3, resp3) re.NoError(err) @@ -634,9 +648,11 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true) _, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4) re.NoError(err) - re.Equal(penalty.WriteBytes, float64(0)) + re.Equal(penalty.WriteBytes, 0.0) _, err = c.OnResponse(resourceGroupName, req4, resp4) re.NoError(err) + + c.Stop() } func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { @@ -678,7 +694,7 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { re.NoError(err) for _, resp := range aresp { re.Len(resp.GrantedRUTokens, 1) - re.Equal(resp.GrantedRUTokens[0].GrantedTokens.Tokens, float64(30000.)) + re.Equal(resp.GrantedRUTokens[0].GrantedTokens.Tokens, 30000.) if resp.ResourceGroupName == "test2" { re.Equal(int64(-1), resp.GrantedRUTokens[0].GrantedTokens.GetSettings().GetBurstLimit()) } @@ -702,6 +718,14 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { re.NoError(err) checkFunc(gresp, groups[0]) } + + reqs.Background = true + aresp, err := cli.AcquireTokenBuckets(suite.ctx, reqs) + re.NoError(err) + for _, resp := range aresp { + re.Len(resp.GrantedRUTokens, 0) + } + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/resourcemanager/server/fastPersist")) } @@ -1118,3 +1142,58 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/fastCleanup")) controller.Stop() } + +func (suite *resourceManagerClientTestSuite) TestSkipConsumptionForBackgroundJobs() { + re := suite.Require() + cli := suite.client + + for _, group := range suite.initGroups { + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + } + + cfg := &controller.RequestUnitConfig{ + ReadBaseCost: 1, + ReadCostPerByte: 1, + WriteBaseCost: 1, + WriteCostPerByte: 1, + CPUMsCost: 1, + } + c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg) + c.Start(suite.ctx) + + resourceGroupName := suite.initGroups[1].Name + req := controller.NewTestRequestInfo(false, 0, 1) + resp := controller.NewTestResponseInfo(0, time.Duration(30), true) + consumption, penalty, err := c.OnRequestWait(suite.ctx, resourceGroupName, req) + re.NoError(err) + re.NotNil(consumption) + re.NotNil(penalty) + consumption, err = c.OnResponse(resourceGroupName, req, resp) + re.NotNil(consumption) + re.NoError(err) + + resourceGroupName = "background_job" + // Check background job `br` will not consume tokens. + req.SetRequestSource("br") + consumption, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) + re.NoError(err) + re.Nil(consumption) + re.Nil(penalty) + consumption, err = c.OnResponse(resourceGroupName, req, resp) + re.Nil(consumption) + re.NoError(err) + + // Check background job `lightning` will not consume tokens. + req.SetRequestSource("lightning") + consumption, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) + re.NoError(err) + re.Nil(consumption) + re.Nil(penalty) + consumption, err = c.OnResponse(resourceGroupName, req, resp) + re.Nil(consumption) + re.NoError(err) + + c.Stop() +}