Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Jul 7, 2023
1 parent 1015d67 commit 5a34eab
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 24 deletions.
34 changes: 18 additions & 16 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,7 @@ func (c *ResourceGroupsController) OnRequestWait(
failedRequestCounter.WithLabelValues(resourceGroupName).Inc()
return nil, nil, err
}
jobTypes := gc.meta.BackgroundSettings.JobTypes
if JobTypesContains(jobTypes, info.RequestSource()) {
// If the resource group has background jobs, we should not wait for it.
if gc.isBackgroundRequest(info.RequestSource()) {
return nil, nil, nil
}
return gc.onRequestWait(ctx, info)
Expand All @@ -479,9 +477,7 @@ func (c *ResourceGroupsController) OnResponse(
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
return &rmpb.Consumption{}, nil
}
jobTypes := tmp.(*groupCostController).meta.BackgroundSettings.JobTypes
if JobTypesContains(jobTypes, req.RequestSource()) {
// If the resource group has background jobs, we should not wait for it.
if tmp.(*groupCostController).isBackgroundRequest(req.RequestSource()) {
return nil, nil
}
return tmp.(*groupCostController).onResponse(req, resp)
Expand Down Expand Up @@ -1059,6 +1055,22 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType
return req
}

// If the resource group has background jobs, we should not record consumption and wait for it.
func (gc *groupCostController) isBackgroundRequest(requestResource string) bool {
gc.metaLock.Lock()
defer gc.metaLock.Unlock()

if gc.meta.BackgroundSettings != nil {
jobTypes := gc.meta.BackgroundSettings.JobTypes
for _, jobType := range jobTypes {
if strings.Contains(requestResource, jobType) {
return true
}
}
}
return false
}

func (gc *groupCostController) getMeta() *rmpb.ResourceGroup {
gc.metaLock.Lock()
defer gc.metaLock.Unlock()
Expand Down Expand Up @@ -1215,13 +1227,3 @@ func (gc *groupCostController) getKVCalculator() *KVCalculator {
}
return nil
}

// JobTypesContains returns true if the given value contains the slice.
func JobTypesContains(slice []string, value string) bool {
for _, v := range slice {
if strings.Contains(value, v) {
return true
}
}
return false
}
5 changes: 5 additions & 0 deletions client/resource_group/controller/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ func (tri *TestRequestInfo) RequestSource() string {
return tri.requestSource
}

// SetRequestSource set the request source.
func (tri *TestRequestInfo) SetRequestSource(requestSource string) {
tri.requestSource = requestSource
}

// TestResponseInfo is used to test the response info interface.
type TestResponseInfo struct {
readBytes uint64
Expand Down
95 changes: 87 additions & 8 deletions tests/integrations/mcs/resourcemanager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ func (suite *resourceManagerClientTestSuite) SetupSuite() {
},
},
},
{
Name: "background_job",
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 10000,
BurstLimit: -1,
},
Tokens: 100000,
},
},
BackgroundSettings: &rmpb.BackgroundSettings{JobTypes: []string{"br", "lightning"}},
},
}
}

Expand Down Expand Up @@ -576,7 +590,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
resp := controller.NewTestResponseInfo(0, time.Duration(30), true)
_, penalty, err := c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
re.Equal(penalty.WriteBytes, 0.0)
re.Equal(penalty.TotalCpuTimeMs, 0.0)
_, err = c.OnResponse(resourceGroupName, req, resp)
re.NoError(err)
Expand All @@ -585,7 +599,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
resp = controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
re.Equal(penalty.WriteBytes, 0.0)
re.Equal(penalty.TotalCpuTimeMs, 0.0)
_, err = c.OnResponse(resourceGroupName, req, resp)
re.NoError(err)
Expand All @@ -595,7 +609,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
resp = controller.NewTestResponseInfo(0, time.Duration(0), false)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
re.Equal(penalty.WriteBytes, 0.0)
re.Equal(penalty.TotalCpuTimeMs, 0.0)
_, err = c.OnResponse(resourceGroupName, req, resp)
re.NoError(err)
Expand All @@ -605,7 +619,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
resp1 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
re.Equal(penalty.WriteBytes, 0.0)
_, err = c.OnResponse(resourceGroupName, req1, resp1)
re.NoError(err)

Expand All @@ -614,7 +628,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
resp2 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(60))
re.Equal(penalty.WriteBytes, 60.0)
re.InEpsilon(penalty.TotalCpuTimeMs, 10.0/1000.0/1000.0, 1e-6)
_, err = c.OnResponse(resourceGroupName, req2, resp2)
re.NoError(err)
Expand All @@ -624,7 +638,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
resp3 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
re.Equal(penalty.WriteBytes, 0.0)
_, err = c.OnResponse(resourceGroupName, req3, resp3)
re.NoError(err)

Expand All @@ -634,9 +648,11 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4)
re.NoError(err)
re.Equal(penalty.WriteBytes, float64(0))
re.Equal(penalty.WriteBytes, 0.0)
_, err = c.OnResponse(resourceGroupName, req4, resp4)
re.NoError(err)

c.Stop()
}

func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() {
Expand Down Expand Up @@ -678,7 +694,7 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() {
re.NoError(err)
for _, resp := range aresp {
re.Len(resp.GrantedRUTokens, 1)
re.Equal(resp.GrantedRUTokens[0].GrantedTokens.Tokens, float64(30000.))
re.Equal(resp.GrantedRUTokens[0].GrantedTokens.Tokens, 30000.)
if resp.ResourceGroupName == "test2" {
re.Equal(int64(-1), resp.GrantedRUTokens[0].GrantedTokens.GetSettings().GetBurstLimit())
}
Expand All @@ -702,6 +718,14 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() {
re.NoError(err)
checkFunc(gresp, groups[0])
}

reqs.Background = true
aresp, err := cli.AcquireTokenBuckets(suite.ctx, reqs)
re.NoError(err)
for _, resp := range aresp {
re.Len(resp.GrantedRUTokens, 0)
}

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/resourcemanager/server/fastPersist"))
}

Expand Down Expand Up @@ -1118,3 +1142,58 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/fastCleanup"))
controller.Stop()
}

func (suite *resourceManagerClientTestSuite) TestSkipConsumptionForBackgroundJobs() {
re := suite.Require()
cli := suite.client

for _, group := range suite.initGroups {
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
}

cfg := &controller.RequestUnitConfig{
ReadBaseCost: 1,
ReadCostPerByte: 1,
WriteBaseCost: 1,
WriteCostPerByte: 1,
CPUMsCost: 1,
}
c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg)
c.Start(suite.ctx)

resourceGroupName := suite.initGroups[1].Name
req := controller.NewTestRequestInfo(false, 0, 1)
resp := controller.NewTestResponseInfo(0, time.Duration(30), true)
consumption, penalty, err := c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.NotNil(consumption)
re.NotNil(penalty)
consumption, err = c.OnResponse(resourceGroupName, req, resp)
re.NotNil(consumption)
re.NoError(err)

resourceGroupName = "background_job"
// Check background job `br` will not consume tokens.
req.SetRequestSource("br")
consumption, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Nil(consumption)
re.Nil(penalty)
consumption, err = c.OnResponse(resourceGroupName, req, resp)
re.Nil(consumption)
re.NoError(err)

// Check background job `lightning` will not consume tokens.
req.SetRequestSource("lightning")
consumption, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Nil(consumption)
re.Nil(penalty)
consumption, err = c.OnResponse(resourceGroupName, req, resp)
re.Nil(consumption)
re.NoError(err)

c.Stop()
}

0 comments on commit 5a34eab

Please sign in to comment.