Skip to content

Commit

Permalink
Use the default resource group if the requested one doesn't exist
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Jul 12, 2024
1 parent c50993b commit ae95a2f
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 50 deletions.
2 changes: 1 addition & 1 deletion client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,5 @@ type ErrClientGetResourceGroup struct {
}

func (e *ErrClientGetResourceGroup) Error() string {
return fmt.Sprintf("get resource group %v failed, %v", e.ResourceGroupName, e.Cause)
return fmt.Sprintf("get resource group %s failed, %s", e.ResourceGroupName, e.Cause)
}
87 changes: 60 additions & 27 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
)

const (
defaultResourceGroupName = "default"
controllerConfigPath = "resource_group/controller"
maxNotificationChanLen = 200
needTokensAmplification = 1.1
Expand Down Expand Up @@ -356,22 +357,29 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
if err = proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
}
if item, ok := c.groupsController.Load(group.Name); ok {
gc := item.(*groupCostController)
gc, ok := c.loadGroupController(group.Name)
if ok {
gc.modifyMeta(group)
}
case meta_storagepb.Event_DELETE:
if item.PrevKv != nil {
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
continue
}
if _, ok := c.groupsController.LoadAndDelete(group.Name); ok {
// Do not delete the resource group immediately, just mark it as tombstone.
// For the requests that are still in progress, fallback to the default resource group.
gc, ok := c.loadGroupController(group.Name)
if ok {
gc.tombstone.Store(true)
resourceGroupStatusGauge.DeleteLabelValues(group.Name, group.Name)
resourceGroupStatusGauge.WithLabelValues(group.Name, defaultResourceGroupName).Set(1)
log.Info("[resource group controller] mark resource group as tombstone", zap.String("name", group.Name))
}
} else {
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
log.Info("previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))
log.Info("[resource group controller] previous key-value pair has been compacted",
zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))

Check warning on line 382 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L381-L382

Added lines #L381 - L382 were not covered by tests
}
}
}
Expand Down Expand Up @@ -420,12 +428,32 @@ func (c *ResourceGroupsController) Stop() error {
return nil
}

// loadGroupController just wraps the `Load` method of `sync.Map`.
func (c *ResourceGroupsController) loadGroupController(name string) (*groupCostController, bool) {
tmp, ok := c.groupsController.Load(name)
if !ok {
return nil, false
}
return tmp.(*groupCostController), true
}

// loadOrStoreGroupController just wraps the `LoadOrStore` method of `sync.Map`.
func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *groupCostController) (*groupCostController, bool) {
tmp, loaded := c.groupsController.LoadOrStore(name, gc)
return tmp.(*groupCostController), loaded
}

// tryGetResourceGroup will try to get the resource group controller from local cache first,
// if the local cache misses, it will then call gRPC to fetch the resource group info from server.
func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name string) (*groupCostController, error) {
// Get from the local cache first.
if tmp, ok := c.groupsController.Load(name); ok {
return tmp.(*groupCostController), nil
gc, ok := c.loadGroupController(name)
if ok {
// If the resource group is marked as tombstone, fallback to the default resource group.
if gc.tombstone.Load() {
return c.tryGetResourceGroup(ctx, defaultResourceGroupName)
}
return gc, nil
}
// Call gRPC to fetch the resource group info.
group, err := c.provider.GetResourceGroup(ctx, name)
Expand All @@ -436,24 +464,21 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name
return nil, errors.Errorf("%s does not exists", name)
}
// Check again to prevent initializing the same resource group concurrently.
if tmp, ok := c.groupsController.Load(name); ok {
gc := tmp.(*groupCostController)
if gc, ok = c.loadGroupController(name); ok {
return gc, nil
}
// Initialize the resource group controller.
gc, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
gc, err = newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
if err != nil {
return nil, err
}
// TODO: re-init the state if user change mode from RU to RAW mode.
gc.initRunState()
// Check again to prevent initializing the same resource group concurrently.
tmp, loaded := c.groupsController.LoadOrStore(group.GetName(), gc)
gc, loaded := c.loadOrStoreGroupController(group.Name, gc)
if !loaded {
resourceGroupStatusGauge.WithLabelValues(name, group.Name).Set(1)
log.Info("[resource group controller] create resource group cost controller", zap.String("name", group.GetName()))
}
return tmp.(*groupCostController), nil
return gc, nil
}

func (c *ResourceGroupsController) cleanUpResourceGroup() {
Expand All @@ -465,14 +490,15 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() {
latestConsumption := *gc.mu.consumption
gc.mu.Unlock()
if equalRU(latestConsumption, *gc.run.consumption) {
if gc.tombstone {
if gc.inactive || gc.tombstone.Load() {
c.groupsController.Delete(resourceGroupName)
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName)
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, defaultResourceGroupName)
return true
}
gc.tombstone = true
gc.inactive = true
} else {
gc.tombstone = false
gc.inactive = false
}
return true
})
Expand All @@ -498,12 +524,11 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB
c.run.inDegradedMode = false
for _, res := range resp {
name := res.GetResourceGroupName()
v, ok := c.groupsController.Load(name)
gc, ok := c.loadGroupController(name)
if !ok {
log.Warn("[resource group controller] a non-existent resource group was found when handle token response", zap.String("name", name))
continue
}
gc := v.(*groupCostController)
gc.handleTokenBucketResponse(res)
}
}
Expand Down Expand Up @@ -572,12 +597,16 @@ func (c *ResourceGroupsController) OnRequestWait(
func (c *ResourceGroupsController) OnResponse(
resourceGroupName string, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error) {
tmp, ok := c.groupsController.Load(resourceGroupName)
gc, ok := c.loadGroupController(resourceGroupName)
if !ok {
// If the resource group does not exist, use the default resource group.
if resourceGroupName != defaultResourceGroupName {
return c.OnResponse(defaultResourceGroupName, req, resp)
}
log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName))
return &rmpb.Consumption{}, nil
}
return tmp.(*groupCostController).onResponse(req, resp)
return gc.onResponse(req, resp)
}

// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
Expand All @@ -594,8 +623,7 @@ func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context,
func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context, bg *rmpb.BackgroundSettings, requestResource string) bool {
// fallback to default resource group.
if bg == nil {
resourceGroupName := "default"
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
gc, err := c.tryGetResourceGroup(ctx, defaultResourceGroupName)
if err != nil {
return false
}
Expand Down Expand Up @@ -681,7 +709,10 @@ type groupCostController struct {
requestUnitTokens map[rmpb.RequestUnitType]*tokenCounter
}

tombstone bool
// tombstone is set to true when the resource group is deleted.
tombstone atomic.Bool
// inactive is set to true when the resource group has not been updated for a long time.
inactive bool
}

type groupMetricsCollection struct {
Expand Down Expand Up @@ -774,6 +805,8 @@ func newGroupCostController(
gc.mu.consumption = &rmpb.Consumption{}
gc.mu.storeCounter = make(map[uint64]*rmpb.Consumption)
gc.mu.globalCounter = &rmpb.Consumption{}
// TODO: re-init the state if user change mode from RU to RAW mode.
gc.initRunState()
return gc, nil
}

Expand Down Expand Up @@ -1359,14 +1392,14 @@ func (gc *groupCostController) onResponse(
return delta, nil
}

// GetActiveResourceGroup is used to get action resource group.
// GetActiveResourceGroup is used to get active resource group.
// This is used for test only.
func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup {
tmp, ok := c.groupsController.Load(resourceGroupName)
if !ok {
gc, ok := c.loadGroupController(resourceGroupName)
if !ok || gc.tombstone.Load() {
return nil
}
return tmp.(*groupCostController).getMeta()
return gc.getMeta()
}

// This is used for test only.
Expand Down
64 changes: 48 additions & 16 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController
func TestGroupControlBurstable(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
gc.initRunState()
args := tokenBucketReconfigureArgs{
NewRate: 1000,
NewBurst: -1,
Expand All @@ -74,7 +73,6 @@ func TestGroupControlBurstable(t *testing.T) {
func TestRequestAndResponseConsumption(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
gc.initRunState()
testCases := []struct {
req *TestRequestInfo
resp *TestResponseInfo
Expand Down Expand Up @@ -126,7 +124,6 @@ func TestRequestAndResponseConsumption(t *testing.T) {
func TestResourceGroupThrottledError(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
gc.initRunState()
req := &TestRequestInfo{
isWrite: true,
writeBytes: 10000000,
Expand All @@ -142,6 +139,14 @@ type MockResourceGroupProvider struct {
mock.Mock
}

func newMockResourceGroupProvider() *MockResourceGroupProvider {
mockProvider := &MockResourceGroupProvider{}
mockProvider.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&meta_storagepb.GetResponse{}, nil)
mockProvider.On("LoadResourceGroups", mock.Anything).Return([]*rmpb.ResourceGroup{}, int64(0), nil)
mockProvider.On("Watch", mock.Anything, mock.Anything, mock.Anything).Return(make(chan []*meta_storagepb.Event), nil)
return mockProvider
}

func (m *MockResourceGroupProvider) GetResourceGroup(ctx context.Context, resourceGroupName string, opts ...pd.GetResourceGroupOption) (*rmpb.ResourceGroup, error) {
args := m.Called(ctx, resourceGroupName, opts)
return args.Get(0).(*rmpb.ResourceGroup), args.Error(1)
Expand Down Expand Up @@ -191,28 +196,22 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockProvider := new(MockResourceGroupProvider)

mockProvider.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&meta_storagepb.GetResponse{}, nil)
// LoadResourceGroups
mockProvider.On("LoadResourceGroups", mock.Anything).Return([]*rmpb.ResourceGroup{}, int64(0), nil)
// Watch
mockProvider.On("Watch", mock.Anything, mock.Anything, mock.Anything).Return(make(chan []*meta_storagepb.Event), nil)

re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport", fmt.Sprintf("return(\"%s\")", "default")))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport", fmt.Sprintf("return(\"%s\")", defaultResourceGroupName)))
defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport")
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport", fmt.Sprintf("return(\"%s\")", "test-group")))
defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport")

mockProvider := newMockResourceGroupProvider()
controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil)
controller.Start(ctx)

defaultResourceGroup := &rmpb.ResourceGroup{Name: "default", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
defaultResourceGroup := &rmpb.ResourceGroup{Name: defaultResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
testResourceGroup := &rmpb.ResourceGroup{Name: "test-group", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
mockProvider.On("GetResourceGroup", mock.Anything, "default", mock.Anything).Return(defaultResourceGroup, nil)
mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultResourceGroup, nil)
mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil)

c1, err := controller.tryGetResourceGroup(ctx, "default")
c1, err := controller.tryGetResourceGroup(ctx, defaultResourceGroupName)
re.NoError(err)
re.Equal(defaultResourceGroup, c1.meta)

Expand All @@ -226,11 +225,11 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
request := args.Get(1).(*rmpb.TokenBucketsRequest)
var responses []*rmpb.TokenBucketResponse
for _, req := range request.Requests {
if req.ResourceGroupName == "default" {
if req.ResourceGroupName == defaultResourceGroupName {
// no response the default group request, that's mean `len(c.run.currentRequests) != 0` always.
time.Sleep(100 * time.Second)
responses = append(responses, &rmpb.TokenBucketResponse{
ResourceGroupName: "default",
ResourceGroupName: defaultResourceGroupName,
GrantedRUTokens: []*rmpb.GrantedRUTokenBucket{
{
GrantedTokens: &rmpb.TokenBucket{
Expand Down Expand Up @@ -271,3 +270,36 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
re.Fail("timeout")
}
}

func TestGetController(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockProvider := newMockResourceGroupProvider()
controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil)
controller.Start(ctx)

defaultResourceGroup := &rmpb.ResourceGroup{Name: defaultResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
testResourceGroup := &rmpb.ResourceGroup{Name: "test-group", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultResourceGroup, nil)
mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil)
mockProvider.On("GetResourceGroup", mock.Anything, "test-group-non-existent", mock.Anything).Return((*rmpb.ResourceGroup)(nil), nil)

c, err := controller.GetResourceGroup("test-group-non-existent")
re.Error(err)
re.Nil(c)
c, err = controller.GetResourceGroup(defaultResourceGroupName)
re.NoError(err)
re.Equal(defaultResourceGroup, c)
c, err = controller.GetResourceGroup("test-group")
re.NoError(err)
re.Equal(testResourceGroup, c)
// Mark the tombstone manually to test the fallback case.
gc, err := controller.tryGetResourceGroup(ctx, "test-group")
re.NoError(err)
gc.tombstone.Store(true)
c, err = controller.GetResourceGroup("test-group")
re.NoError(err)
re.Equal(defaultResourceGroup, c)
}
Loading

0 comments on commit ae95a2f

Please sign in to comment.