Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Jul 13, 2023
1 parent c6edabc commit 242132e
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 64 deletions.
47 changes: 23 additions & 24 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ const (
type ResourceGroupKVInterceptor interface {
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, error)
// OnResponse is used to consume tokens after receiving response
// OnResponse is used to consume tokens after receiving response.
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
// IsBackgroundRequest is used to check whether the request is background job.
IsBackgroundRequest(ctx context.Context, resourceGroupName, requestResource string) bool
}

// ResourceGroupProvider provides some api to interact with resource manager server.
Expand Down Expand Up @@ -456,12 +458,8 @@ func (c *ResourceGroupsController) OnRequestWait(
) (*rmpb.Consumption, *rmpb.Consumption, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
failedRequestCounter.WithLabelValues(resourceGroupName).Inc()
return nil, nil, err
}
if gc.isBackgroundRequest(info.RequestSource()) {
return nil, nil, nil
}
return gc.onRequestWait(ctx, info)
}

Expand All @@ -474,12 +472,29 @@ func (c *ResourceGroupsController) OnResponse(
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
return &rmpb.Consumption{}, nil
}
if tmp.(*groupCostController).isBackgroundRequest(req.RequestSource()) {
return nil, nil
}
return tmp.(*groupCostController).onResponse(req, resp)
}

// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context,
resourceGroupName, requestResource string) bool {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
failedRequestCounter.WithLabelValues(resourceGroupName).Inc()
return false
}

if bg := gc.meta.BackgroundSettings; bg != nil {
if len(requestResource) == 0 || len(bg.JobTypes) == 0 {
return false
}
if idx := strings.LastIndex(requestResource, "_"); idx != -1 {
return slices.Contains(bg.JobTypes, requestResource[idx+1:])
}
}
return false
}

// GetResourceGroup returns the meta setting of the given resource group name.
func (c *ResourceGroupsController) GetResourceGroup(resourceGroupName string) (*rmpb.ResourceGroup, error) {
gc, err := c.tryGetResourceGroup(c.loopCtx, resourceGroupName)
Expand Down Expand Up @@ -1052,22 +1067,6 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType
return req
}

// If the resource group has background jobs, we should not record consumption and wait for it.
func (gc *groupCostController) isBackgroundRequest(requestResource string) bool {
gc.metaLock.Lock()
defer gc.metaLock.Unlock()

if bg := gc.meta.BackgroundSettings; bg != nil {
if len(requestResource) == 0 || len(bg.JobTypes) == 0 {
return false
}
if idx := strings.LastIndex(requestResource, "_"); idx != -1 {
return slices.Contains(bg.JobTypes, requestResource[idx+1:])
}
}
return false
}

func (gc *groupCostController) getMeta() *rmpb.ResourceGroup {
gc.metaLock.Lock()
defer gc.metaLock.Unlock()
Expand Down
11 changes: 0 additions & 11 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,3 @@ func TestRequestAndResponseConsumption(t *testing.T) {
re.Equal(expectedConsumption.TotalCpuTimeMs, consumption.TotalCpuTimeMs, caseNum)
}
}

func TestIsBackgroundRequest(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
re.False(gc.isBackgroundRequest("test"))
re.False(gc.isBackgroundRequest("unknown_default"))
re.True(gc.isBackgroundRequest("internal_lightning"))
re.False(gc.isBackgroundRequest("internal_lightning_default"))
re.False(gc.isBackgroundRequest("external_test_default"))
re.True(gc.isBackgroundRequest("external_unknown_lightning"))
}
38 changes: 9 additions & 29 deletions tests/integrations/mcs/resourcemanager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,15 +1044,16 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo
rruTokensAtATime: 0,
wruTokensAtATime: 2,
}
controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest())
resourceName := "modetest"
controller.OnRequestWait(suite.ctx, resourceName, tc.makeWriteRequest())
time.Sleep(time.Second * 2)
beginTime := time.Now()
// This is used to make sure resource group in lowRU.
for i := 0; i < 100; i++ {
controller.OnRequestWait(suite.ctx, "modetest", tc2.makeWriteRequest())
controller.OnRequestWait(suite.ctx, resourceName, tc2.makeWriteRequest())
}
for i := 0; i < 100; i++ {
controller.OnRequestWait(suite.ctx, "modetest", tc.makeWriteRequest())
controller.OnRequestWait(suite.ctx, resourceName, tc.makeWriteRequest())
}
endTime := time.Now()
// we can not check `inDegradedMode` because of data race.
Expand Down Expand Up @@ -1165,35 +1166,14 @@ func (suite *resourceManagerClientTestSuite) TestSkipConsumptionForBackgroundJob

resourceGroupName := suite.initGroups[1].Name
req := controller.NewTestRequestInfo(false, 0, 1)
resp := controller.NewTestResponseInfo(0, time.Duration(30), true)
consumption, penalty, err := c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.NotNil(consumption)
re.NotNil(penalty)
consumption, err = c.OnResponse(resourceGroupName, req, resp)
re.NotNil(consumption)
re.NoError(err)
re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, req.RequestSource()))

resourceGroupName = "background_job"
// Check background job `br` will not consume tokens.
req.SetRequestSource("br")
consumption, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Nil(consumption)
re.Nil(penalty)
consumption, err = c.OnResponse(resourceGroupName, req, resp)
re.Nil(consumption)
re.NoError(err)
req.SetRequestSource("internal_br")
re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, req.RequestSource()))

// Check background job `lightning` will not consume tokens.
req.SetRequestSource("lightning")
consumption, penalty, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Nil(consumption)
re.Nil(penalty)
consumption, err = c.OnResponse(resourceGroupName, req, resp)
re.Nil(consumption)
re.NoError(err)
req.SetRequestSource("internal_lightning")
re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, req.RequestSource()))

c.Stop()
}

0 comments on commit 242132e

Please sign in to comment.