diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 2ed63dc7bbad..21659abf21bb 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -58,8 +58,10 @@ const ( type ResourceGroupKVInterceptor interface { // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, error) - // OnResponse is used to consume tokens after receiving response + // OnResponse is used to consume tokens after receiving response. OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error) + // IsBackgroundRequest is used to check whether the request is background job. + IsBackgroundRequest(ctx context.Context, resourceGroupName, requestResource string) bool } // ResourceGroupProvider provides some api to interact with resource manager server. @@ -456,12 +458,8 @@ func (c *ResourceGroupsController) OnRequestWait( ) (*rmpb.Consumption, *rmpb.Consumption, error) { gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) if err != nil { - failedRequestCounter.WithLabelValues(resourceGroupName).Inc() return nil, nil, err } - if gc.isBackgroundRequest(info.RequestSource()) { - return nil, nil, nil - } return gc.onRequestWait(ctx, info) } @@ -474,12 +472,29 @@ func (c *ResourceGroupsController) OnResponse( log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName)) return &rmpb.Consumption{}, nil } - if tmp.(*groupCostController).isBackgroundRequest(req.RequestSource()) { - return nil, nil - } return tmp.(*groupCostController).onResponse(req, resp) } +// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it. +func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context, + resourceGroupName, requestResource string) bool { + gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) + if err != nil { + failedRequestCounter.WithLabelValues(resourceGroupName).Inc() + return false + } + + if bg := gc.meta.BackgroundSettings; bg != nil { + if len(requestResource) == 0 || len(bg.JobTypes) == 0 { + return false + } + if idx := strings.LastIndex(requestResource, "_"); idx != -1 { + return slices.Contains(bg.JobTypes, requestResource[idx+1:]) + } + } + return false +} + // GetResourceGroup returns the meta setting of the given resource group name. func (c *ResourceGroupsController) GetResourceGroup(resourceGroupName string) (*rmpb.ResourceGroup, error) { gc, err := c.tryGetResourceGroup(c.loopCtx, resourceGroupName) @@ -1052,22 +1067,6 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType return req } -// If the resource group has background jobs, we should not record consumption and wait for it. -func (gc *groupCostController) isBackgroundRequest(requestResource string) bool { - gc.metaLock.Lock() - defer gc.metaLock.Unlock() - - if bg := gc.meta.BackgroundSettings; bg != nil { - if len(requestResource) == 0 || len(bg.JobTypes) == 0 { - return false - } - if idx := strings.LastIndex(requestResource, "_"); idx != -1 { - return slices.Contains(bg.JobTypes, requestResource[idx+1:]) - } - } - return false -} - func (gc *groupCostController) getMeta() *rmpb.ResourceGroup { gc.metaLock.Lock() defer gc.metaLock.Unlock() diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 55307a8468a5..880d91780aa8 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -115,14 +115,3 @@ func TestRequestAndResponseConsumption(t *testing.T) { re.Equal(expectedConsumption.TotalCpuTimeMs, consumption.TotalCpuTimeMs, caseNum) } } - -func TestIsBackgroundRequest(t *testing.T) { - re := require.New(t) - gc := createTestGroupCostController(re) - re.False(gc.isBackgroundRequest("test")) - re.False(gc.isBackgroundRequest("unknown_default")) - re.True(gc.isBackgroundRequest("internal_lightning")) - re.False(gc.isBackgroundRequest("internal_lightning_default")) - re.False(gc.isBackgroundRequest("external_test_default")) - re.True(gc.isBackgroundRequest("external_unknown_lightning")) -} diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 76d69d8c559c..32aa5e8a43f6 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -1044,15 +1044,16 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo rruTokensAtATime: 0, wruTokensAtATime: 2, } - controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest()) + resourceName := "modetest" + controller.OnRequestWait(suite.ctx, resourceName, tc.makeWriteRequest()) time.Sleep(time.Second * 2) beginTime := time.Now() // This is used to make sure resource group in lowRU. for i := 0; i < 100; i++ { - controller.OnRequestWait(suite.ctx, "modetest", tc2.makeWriteRequest()) + controller.OnRequestWait(suite.ctx, resourceName, tc2.makeWriteRequest()) } for i := 0; i < 100; i++ { - controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest()) + controller.OnRequestWait(suite.ctx, resourceName, tc.makeWriteRequest()) } endTime := time.Now() // we can not check `inDegradedMode` because of data race. @@ -1165,35 +1166,14 @@ func (suite *resourceManagerClientTestSuite) TestSkipConsumptionForBackgroundJob resourceGroupName := suite.initGroups[1].Name req := controller.NewTestRequestInfo(false, 0, 1) - resp := controller.NewTestResponseInfo(0, time.Duration(30), true) - consumption, penalty, err := c.OnRequestWait(suite.ctx, resourceGroupName, req) - re.NoError(err) - re.NotNil(consumption) - re.NotNil(penalty) - consumption, err = c.OnResponse(resourceGroupName, req, resp) - re.NotNil(consumption) - re.NoError(err) + re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, req.RequestSource())) resourceGroupName = "background_job" - // Check background job `br` will not consume tokens. - req.SetRequestSource("br") - consumption, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) - re.NoError(err) - re.Nil(consumption) - re.Nil(penalty) - consumption, err = c.OnResponse(resourceGroupName, req, resp) - re.Nil(consumption) - re.NoError(err) + req.SetRequestSource("internal_br") + re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, req.RequestSource())) - // Check background job `lightning` will not consume tokens. - req.SetRequestSource("lightning") - consumption, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req) - re.NoError(err) - re.Nil(consumption) - re.Nil(penalty) - consumption, err = c.OnResponse(resourceGroupName, req, resp) - re.Nil(consumption) - re.NoError(err) + req.SetRequestSource("internal_lightning") + re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, req.RequestSource())) c.Stop() }