From ae95a2f876018b7738d9e65f84d56352875c6175 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 11 Jul 2024 14:42:17 +0800 Subject: [PATCH] Use the default resource group if the requested one doesn't exist Signed-off-by: JmPotato --- client/errs/errno.go | 2 +- .../resource_group/controller/controller.go | 87 +++++++++++++------ .../controller/controller_test.go | 64 ++++++++++---- .../resourcemanager/resource_manager_test.go | 40 +++++++-- 4 files changed, 143 insertions(+), 50 deletions(-) diff --git a/client/errs/errno.go b/client/errs/errno.go index 0dbcb4fe147a..95c6bffdfa41 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -100,5 +100,5 @@ type ErrClientGetResourceGroup struct { } func (e *ErrClientGetResourceGroup) Error() string { - return fmt.Sprintf("get resource group %v failed, %v", e.ResourceGroupName, e.Cause) + return fmt.Sprintf("get resource group %s failed, %s", e.ResourceGroupName, e.Cause) } diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index d25eaee88f0b..c0fa3c5e3a98 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -38,6 +38,7 @@ import ( ) const ( + defaultResourceGroupName = "default" controllerConfigPath = "resource_group/controller" maxNotificationChanLen = 200 needTokensAmplification = 1.1 @@ -356,8 +357,8 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { if err = proto.Unmarshal(item.Kv.Value, group); err != nil { continue } - if item, ok := c.groupsController.Load(group.Name); ok { - gc := item.(*groupCostController) + gc, ok := c.loadGroupController(group.Name) + if ok { gc.modifyMeta(group) } case meta_storagepb.Event_DELETE: @@ -365,13 +366,20 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil { continue } - if _, ok := c.groupsController.LoadAndDelete(group.Name); ok { + // Do not delete the resource group immediately, just mark it as tombstone. + // For the requests that are still in progress, fallback to the default resource group. + gc, ok := c.loadGroupController(group.Name) + if ok { + gc.tombstone.Store(true) resourceGroupStatusGauge.DeleteLabelValues(group.Name, group.Name) + resourceGroupStatusGauge.WithLabelValues(group.Name, defaultResourceGroupName).Set(1) + log.Info("[resource group controller] mark resource group as tombstone", zap.String("name", group.Name)) } } else { // Prev-kv is compacted means there must have been a delete event before this event, // which means that this is just a duplicated event, so we can just ignore it. - log.Info("previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value))) + log.Info("[resource group controller] previous key-value pair has been compacted", + zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value))) } } } @@ -420,12 +428,32 @@ func (c *ResourceGroupsController) Stop() error { return nil } +// loadGroupController just wraps the `Load` method of `sync.Map`. +func (c *ResourceGroupsController) loadGroupController(name string) (*groupCostController, bool) { + tmp, ok := c.groupsController.Load(name) + if !ok { + return nil, false + } + return tmp.(*groupCostController), true +} + +// loadOrStoreGroupController just wraps the `LoadOrStore` method of `sync.Map`. +func (c *ResourceGroupsController) loadOrStoreGroupController(name string, gc *groupCostController) (*groupCostController, bool) { + tmp, loaded := c.groupsController.LoadOrStore(name, gc) + return tmp.(*groupCostController), loaded +} + // tryGetResourceGroup will try to get the resource group controller from local cache first, // if the local cache misses, it will then call gRPC to fetch the resource group info from server. func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name string) (*groupCostController, error) { // Get from the local cache first. - if tmp, ok := c.groupsController.Load(name); ok { - return tmp.(*groupCostController), nil + gc, ok := c.loadGroupController(name) + if ok { + // If the resource group is marked as tombstone, fallback to the default resource group. + if gc.tombstone.Load() { + return c.tryGetResourceGroup(ctx, defaultResourceGroupName) + } + return gc, nil } // Call gRPC to fetch the resource group info. group, err := c.provider.GetResourceGroup(ctx, name) @@ -436,24 +464,21 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name return nil, errors.Errorf("%s does not exists", name) } // Check again to prevent initializing the same resource group concurrently. - if tmp, ok := c.groupsController.Load(name); ok { - gc := tmp.(*groupCostController) + if gc, ok = c.loadGroupController(name); ok { return gc, nil } // Initialize the resource group controller. - gc, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) + gc, err = newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) if err != nil { return nil, err } - // TODO: re-init the state if user change mode from RU to RAW mode. - gc.initRunState() // Check again to prevent initializing the same resource group concurrently. - tmp, loaded := c.groupsController.LoadOrStore(group.GetName(), gc) + gc, loaded := c.loadOrStoreGroupController(group.Name, gc) if !loaded { resourceGroupStatusGauge.WithLabelValues(name, group.Name).Set(1) log.Info("[resource group controller] create resource group cost controller", zap.String("name", group.GetName())) } - return tmp.(*groupCostController), nil + return gc, nil } func (c *ResourceGroupsController) cleanUpResourceGroup() { @@ -465,14 +490,15 @@ func (c *ResourceGroupsController) cleanUpResourceGroup() { latestConsumption := *gc.mu.consumption gc.mu.Unlock() if equalRU(latestConsumption, *gc.run.consumption) { - if gc.tombstone { + if gc.inactive || gc.tombstone.Load() { c.groupsController.Delete(resourceGroupName) resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, resourceGroupName) + resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName, defaultResourceGroupName) return true } - gc.tombstone = true + gc.inactive = true } else { - gc.tombstone = false + gc.inactive = false } return true }) @@ -498,12 +524,11 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB c.run.inDegradedMode = false for _, res := range resp { name := res.GetResourceGroupName() - v, ok := c.groupsController.Load(name) + gc, ok := c.loadGroupController(name) if !ok { log.Warn("[resource group controller] a non-existent resource group was found when handle token response", zap.String("name", name)) continue } - gc := v.(*groupCostController) gc.handleTokenBucketResponse(res) } } @@ -572,12 +597,16 @@ func (c *ResourceGroupsController) OnRequestWait( func (c *ResourceGroupsController) OnResponse( resourceGroupName string, req RequestInfo, resp ResponseInfo, ) (*rmpb.Consumption, error) { - tmp, ok := c.groupsController.Load(resourceGroupName) + gc, ok := c.loadGroupController(resourceGroupName) if !ok { + // If the resource group does not exist, use the default resource group. + if resourceGroupName != defaultResourceGroupName { + return c.OnResponse(defaultResourceGroupName, req, resp) + } log.Warn("[resource group controller] resource group name does not exist", zap.String("name", resourceGroupName)) return &rmpb.Consumption{}, nil } - return tmp.(*groupCostController).onResponse(req, resp) + return gc.onResponse(req, resp) } // IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it. @@ -594,8 +623,7 @@ func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context, func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context, bg *rmpb.BackgroundSettings, requestResource string) bool { // fallback to default resource group. if bg == nil { - resourceGroupName := "default" - gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) + gc, err := c.tryGetResourceGroup(ctx, defaultResourceGroupName) if err != nil { return false } @@ -681,7 +709,10 @@ type groupCostController struct { requestUnitTokens map[rmpb.RequestUnitType]*tokenCounter } - tombstone bool + // tombstone is set to true when the resource group is deleted. + tombstone atomic.Bool + // inactive is set to true when the resource group has not been updated for a long time. + inactive bool } type groupMetricsCollection struct { @@ -774,6 +805,8 @@ func newGroupCostController( gc.mu.consumption = &rmpb.Consumption{} gc.mu.storeCounter = make(map[uint64]*rmpb.Consumption) gc.mu.globalCounter = &rmpb.Consumption{} + // TODO: re-init the state if user change mode from RU to RAW mode. + gc.initRunState() return gc, nil } @@ -1359,14 +1392,14 @@ func (gc *groupCostController) onResponse( return delta, nil } -// GetActiveResourceGroup is used to get action resource group. +// GetActiveResourceGroup is used to get active resource group. // This is used for test only. func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup { - tmp, ok := c.groupsController.Load(resourceGroupName) - if !ok { + gc, ok := c.loadGroupController(resourceGroupName) + if !ok || gc.tombstone.Load() { return nil } - return tmp.(*groupCostController).getMeta() + return gc.getMeta() } // This is used for test only. diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 3300edf700f5..b85f951920d6 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -59,7 +59,6 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController func TestGroupControlBurstable(t *testing.T) { re := require.New(t) gc := createTestGroupCostController(re) - gc.initRunState() args := tokenBucketReconfigureArgs{ NewRate: 1000, NewBurst: -1, @@ -74,7 +73,6 @@ func TestGroupControlBurstable(t *testing.T) { func TestRequestAndResponseConsumption(t *testing.T) { re := require.New(t) gc := createTestGroupCostController(re) - gc.initRunState() testCases := []struct { req *TestRequestInfo resp *TestResponseInfo @@ -126,7 +124,6 @@ func TestRequestAndResponseConsumption(t *testing.T) { func TestResourceGroupThrottledError(t *testing.T) { re := require.New(t) gc := createTestGroupCostController(re) - gc.initRunState() req := &TestRequestInfo{ isWrite: true, writeBytes: 10000000, @@ -142,6 +139,14 @@ type MockResourceGroupProvider struct { mock.Mock } +func newMockResourceGroupProvider() *MockResourceGroupProvider { + mockProvider := &MockResourceGroupProvider{} + mockProvider.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&meta_storagepb.GetResponse{}, nil) + mockProvider.On("LoadResourceGroups", mock.Anything).Return([]*rmpb.ResourceGroup{}, int64(0), nil) + mockProvider.On("Watch", mock.Anything, mock.Anything, mock.Anything).Return(make(chan []*meta_storagepb.Event), nil) + return mockProvider +} + func (m *MockResourceGroupProvider) GetResourceGroup(ctx context.Context, resourceGroupName string, opts ...pd.GetResourceGroupOption) (*rmpb.ResourceGroup, error) { args := m.Called(ctx, resourceGroupName, opts) return args.Get(0).(*rmpb.ResourceGroup), args.Error(1) @@ -191,28 +196,22 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mockProvider := new(MockResourceGroupProvider) - mockProvider.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&meta_storagepb.GetResponse{}, nil) - // LoadResourceGroups - mockProvider.On("LoadResourceGroups", mock.Anything).Return([]*rmpb.ResourceGroup{}, int64(0), nil) - // Watch - mockProvider.On("Watch", mock.Anything, mock.Anything, mock.Anything).Return(make(chan []*meta_storagepb.Event), nil) - - re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport", fmt.Sprintf("return(\"%s\")", "default"))) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport", fmt.Sprintf("return(\"%s\")", defaultResourceGroupName))) defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport") re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport", fmt.Sprintf("return(\"%s\")", "test-group"))) defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport") + mockProvider := newMockResourceGroupProvider() controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil) controller.Start(ctx) - defaultResourceGroup := &rmpb.ResourceGroup{Name: "default", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} + defaultResourceGroup := &rmpb.ResourceGroup{Name: defaultResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} testResourceGroup := &rmpb.ResourceGroup{Name: "test-group", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} - mockProvider.On("GetResourceGroup", mock.Anything, "default", mock.Anything).Return(defaultResourceGroup, nil) + mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultResourceGroup, nil) mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil) - c1, err := controller.tryGetResourceGroup(ctx, "default") + c1, err := controller.tryGetResourceGroup(ctx, defaultResourceGroupName) re.NoError(err) re.Equal(defaultResourceGroup, c1.meta) @@ -226,11 +225,11 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) { request := args.Get(1).(*rmpb.TokenBucketsRequest) var responses []*rmpb.TokenBucketResponse for _, req := range request.Requests { - if req.ResourceGroupName == "default" { + if req.ResourceGroupName == defaultResourceGroupName { // no response the default group request, that's mean `len(c.run.currentRequests) != 0` always. time.Sleep(100 * time.Second) responses = append(responses, &rmpb.TokenBucketResponse{ - ResourceGroupName: "default", + ResourceGroupName: defaultResourceGroupName, GrantedRUTokens: []*rmpb.GrantedRUTokenBucket{ { GrantedTokens: &rmpb.TokenBucket{ @@ -271,3 +270,36 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) { re.Fail("timeout") } } + +func TestGetController(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockProvider := newMockResourceGroupProvider() + controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil) + controller.Start(ctx) + + defaultResourceGroup := &rmpb.ResourceGroup{Name: defaultResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} + testResourceGroup := &rmpb.ResourceGroup{Name: "test-group", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} + mockProvider.On("GetResourceGroup", mock.Anything, defaultResourceGroupName, mock.Anything).Return(defaultResourceGroup, nil) + mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil) + mockProvider.On("GetResourceGroup", mock.Anything, "test-group-non-existent", mock.Anything).Return((*rmpb.ResourceGroup)(nil), nil) + + c, err := controller.GetResourceGroup("test-group-non-existent") + re.Error(err) + re.Nil(c) + c, err = controller.GetResourceGroup(defaultResourceGroupName) + re.NoError(err) + re.Equal(defaultResourceGroup, c) + c, err = controller.GetResourceGroup("test-group") + re.NoError(err) + re.Equal(testResourceGroup, c) + // Mark the tombstone manually to test the fallback case. + gc, err := controller.tryGetResourceGroup(ctx, "test-group") + re.NoError(err) + gc.tombstone.Store(true) + c, err = controller.GetResourceGroup("test-group") + re.NoError(err) + re.Equal(defaultResourceGroup, c) +} diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index c8cee89cf32e..a0c72d1f5d18 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -269,7 +269,8 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { testutil.Eventually(re, func() bool { name := groupNamePrefix + strconv.Itoa(i) meta = controller.GetActiveResourceGroup(name) - return meta == nil + // The deleted resource group may not be immediately removed from the controller. + return meta == nil || meta.Name == "default" }, testutil.WithTickInterval(50*time.Millisecond)) } } @@ -401,8 +402,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { CPUMsCost: 1, } - controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace()) + controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg) controller.Start(suite.ctx) + defer controller.Stop() testCases := []struct { resourceGroupName string @@ -463,9 +465,20 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { wreq := tcs.makeWriteRequest() _, _, _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq) re.Error(err) - time.Sleep(time.Millisecond * 200) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate")) - controller.Stop() + + group, err := controller.GetResourceGroup(rg.Name) + re.NoError(err) + re.Equal(rg, group) + resp, err = cli.DeleteResourceGroup(suite.ctx, rg.Name) + re.NoError(err) + re.Contains(resp, "Success!") + // Make sure the resource group is watched by the controller and marked as tombstone. + testutil.Eventually(re, func() bool { + gc, err := controller.GetResourceGroup(rg.Name) + re.NoError(err) + return gc.GetName() == "default" + }, testutil.WithTickInterval(50*time.Millisecond)) } // TestSwitchBurst is used to test https://github.com/tikv/pd/issues/6209 @@ -1276,6 +1289,11 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") + group2 := *group + group2.Name = "tombstone_test" + resp, err = cli.AddResourceGroup(suite.ctx, &group2) + re.NoError(err) + re.Contains(resp, "Success!") re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/fastCleanup", `return(true)`)) controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) @@ -1298,9 +1316,19 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { controller.OnResponse(group.Name, rreq, rres) time.Sleep(100 * time.Microsecond) } - time.Sleep(1 * time.Second) + testutil.Eventually(re, func() bool { + meta := controller.GetActiveResourceGroup(group.Name) + return meta == nil + }, testutil.WithTickInterval(50*time.Millisecond)) - re.Nil(controller.GetActiveResourceGroup(group.Name)) + // Mock server deleted the resource group + resp, err = cli.DeleteResourceGroup(suite.ctx, group2.Name) + re.NoError(err) + re.Contains(resp, "Success!") + testutil.Eventually(re, func() bool { + meta := controller.GetActiveResourceGroup(group2.Name) + return meta == nil + }, testutil.WithTickInterval(50*time.Millisecond)) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/fastCleanup")) controller.Stop()