diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 528369df229..e3495a21ff1 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -234,7 +234,8 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { cfgRevision := resp.GetHeader().GetRevision() var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event if !c.ruConfig.isSingleGroupByKeyspace { - watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix()) + // Use WithPrevKV() to get the previous key-value pair when get Delete Event. + watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV()) if err != nil { log.Warn("watch resource group meta failed", zap.Error(err)) } @@ -260,7 +261,8 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { } case <-watchRetryTimer.C: if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil { - watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix()) + // Use WithPrevKV() to get the previous key-value pair when get Delete Event. + watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV()) if err != nil { log.Warn("watch resource group meta failed", zap.Error(err)) watchRetryTimer.Reset(watchRetryInterval) @@ -319,18 +321,27 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { for _, item := range resp { metaRevision = item.Kv.ModRevision group := &rmpb.ResourceGroup{} - if err := proto.Unmarshal(item.Kv.Value, group); err != nil { - continue - } switch item.Type { case meta_storagepb.Event_PUT: + if err = proto.Unmarshal(item.Kv.Value, group); err != nil { + continue + } if item, ok := c.groupsController.Load(group.Name); ok { gc := item.(*groupCostController) gc.modifyMeta(group) } case meta_storagepb.Event_DELETE: - if _, ok := c.groupsController.LoadAndDelete(group.Name); ok { - resourceGroupStatusGauge.DeleteLabelValues(group.Name) + if item.PrevKv != nil { + if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil { + continue + } + if _, ok := c.groupsController.LoadAndDelete(group.Name); ok { + resourceGroupStatusGauge.DeleteLabelValues(group.Name) + } + } else { + // Prev-kv is compacted means there must have been a delete event before this event, + // which means that this is just a duplicated event, so we can just ignore it. + log.Info("previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value))) } } } diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 68b2de66ae2..30944308584 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -55,7 +55,6 @@ type ResourceManagerClient interface { ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) - WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) Watch(ctx context.Context, key []byte, opts ...OpOption) (chan []*meta_storagepb.Event, error) } @@ -188,52 +187,6 @@ func (c *client) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, return groups, resp.Header.Revision, nil } -// WatchResourceGroup [just for TEST] watches resource groups changes. -// It returns a stream of slices of resource groups. -// The first message in stream contains all current resource groups, -// all subsequent messages contains new events[PUT/DELETE] for all resource groups. -func (c *client) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) { - configChan, err := c.Watch(ctx, GroupSettingsPathPrefixBytes, WithRev(revision), WithPrefix()) - if err != nil { - return nil, err - } - resourceGroupWatcherChan := make(chan []*rmpb.ResourceGroup) - go func() { - defer func() { - close(resourceGroupWatcherChan) - if r := recover(); r != nil { - log.Error("[pd] panic in ResourceManagerClient `WatchResourceGroups`", zap.Any("error", r)) - return - } - }() - for { - select { - case <-ctx.Done(): - return - case res, ok := <-configChan: - if !ok { - return - } - groups := make([]*rmpb.ResourceGroup, 0, len(res)) - for _, item := range res { - switch item.Type { - case meta_storagepb.Event_PUT: - group := &rmpb.ResourceGroup{} - if err := proto.Unmarshal(item.Kv.Value, group); err != nil { - return - } - groups = append(groups, group) - case meta_storagepb.Event_DELETE: - continue - } - } - resourceGroupWatcherChan <- groups - } - } - }() - return resourceGroupWatcherChan, err -} - func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) { req := &tokenRequest{ done: make(chan error, 1), diff --git a/server/grpc_service.go b/server/grpc_service.go index 5e40bc1c732..dd53416d30d 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -2638,7 +2638,7 @@ func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, serve } else { // Prev-kv is compacted means there must have been a delete event before this event, // which means that this is just a duplicated event, so we can just ignore it. - log.Info("previous key-value pair has been compacted", zap.String("previous key", string(e.Kv.Key))) + log.Info("previous key-value pair has been compacted", zap.String("required-key", string(e.Kv.Key))) } } } diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 0be18d1bbd3..6da7bd3aac1 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -205,51 +205,25 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { }, }, } - // Mock get revision by listing - for i := 0; i < 3; i++ { - group.Name += strconv.Itoa(i) - resp, err := cli.AddResourceGroup(suite.ctx, group) - group.Name = "test" - re.NoError(err) - re.Contains(resp, "Success!") - } - lresp, revision, err := cli.LoadResourceGroups(suite.ctx) - re.NoError(err) - re.Equal(len(lresp), 4) - re.Greater(revision, int64(0)) - tcs := tokenConsumptionPerSecond{rruTokensAtATime: 100} - re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/disableWatch", "return(true)")) - defer func() { - re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/disableWatch")) - }() - controllerKeySpace, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil, controller.EnableSingleGroupByKeyspace()) controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) controller.Start(suite.ctx) defer controller.Stop() - controller.OnRequestWait(suite.ctx, "test0", tcs.makeReadRequest()) - meta := controller.GetActiveResourceGroup("test0") - metaShadow, err := controller.GetResourceGroup("test0") - re.NoError(err) - re.Equal(meta.RUSettings.RU, group.RUSettings.RU) - re.Equal(metaShadow.RUSettings.RU, group.RUSettings.RU) - - controllerKeySpace.OnRequestWait(suite.ctx, "test0", tcs.makeReadRequest()) - metaKeySpace := controllerKeySpace.GetActiveResourceGroup("test0") - re.Equal(metaKeySpace.RUSettings.RU, group.RUSettings.RU) - controller.OnRequestWait(suite.ctx, "test1", tcs.makeReadRequest()) - meta = controller.GetActiveResourceGroup("test1") - metaShadow, err = controller.GetResourceGroup("test1") - re.NoError(err) - re.Equal(meta.RUSettings.RU, group.RUSettings.RU) - re.Equal(metaShadow.RUSettings.RU, group.RUSettings.RU) - suite.NoError(err) // Mock add resource groups - for i := 3; i < 9; i++ { + var meta *rmpb.ResourceGroup + groupsNum := 10 + for i := 0; i < groupsNum; i++ { group.Name = "test" + strconv.Itoa(i) resp, err := cli.AddResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") + + // Make sure the resource group active + meta, err = controller.GetResourceGroup(group.Name) + re.NotNil(meta) + re.NoError(err) + meta = controller.GetActiveResourceGroup(group.Name) + re.NotNil(meta) } // Mock modify resource groups modifySettings := func(gs *rmpb.ResourceGroup) { @@ -261,65 +235,97 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { }, } } - re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/watchStreamError", "return(true)")) - for i := 0; i < 2; i++ { - if i == 1 { - testutil.Eventually(re, func() bool { - meta = controller.GetActiveResourceGroup("test0") - return meta.RUSettings.RU.Settings.FillRate == uint64(20000) - }, testutil.WithTickInterval(50*time.Millisecond)) - metaKeySpace = controllerKeySpace.GetActiveResourceGroup("test0") - re.Equal(metaKeySpace.RUSettings.RU.Settings.FillRate, uint64(10000)) - re.NoError(failpoint.Enable("github.com/tikv/pd/client/watchStreamError", "return(true)")) - } + for i := 0; i < groupsNum; i++ { group.Name = "test" + strconv.Itoa(i) modifySettings(group) resp, err := cli.ModifyResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") } - time.Sleep(time.Millisecond * 50) - meta = controller.GetActiveResourceGroup("test1") - re.Equal(meta.RUSettings.RU.Settings.FillRate, uint64(10000)) - re.NoError(failpoint.Disable("github.com/tikv/pd/client/watchStreamError")) + for i := 0; i < groupsNum; i++ { + testutil.Eventually(re, func() bool { + name := "test" + strconv.Itoa(i) + meta = controller.GetActiveResourceGroup(name) + if meta != nil { + return meta.RUSettings.RU.Settings.FillRate == uint64(20000) + } + return false + }, testutil.WithTickInterval(50*time.Millisecond)) + } + + // Mock reset watch stream + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/watchStreamError", "return(true)")) + group.Name = "test" + strconv.Itoa(groupsNum) + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + // Make sure the resource group active + meta, err = controller.GetResourceGroup(group.Name) + re.NotNil(meta) + re.NoError(err) + modifySettings(group) + resp, err = cli.ModifyResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") testutil.Eventually(re, func() bool { - meta = controller.GetActiveResourceGroup("test1") + meta = controller.GetActiveResourceGroup(group.Name) return meta.RUSettings.RU.Settings.FillRate == uint64(20000) }, testutil.WithTickInterval(100*time.Millisecond)) re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/watchStreamError")) - for i := 2; i < 9; i++ { - group.Name = "test" + strconv.Itoa(i) - modifySettings(group) - resp, err := cli.ModifyResourceGroup(suite.ctx, group) - re.NoError(err) - re.Contains(resp, "Success!") - } // Mock delete resource groups suite.cleanupResourceGroups() - time.Sleep(time.Second) - meta = controller.GetActiveResourceGroup(group.Name) - re.Nil(meta) + for i := 0; i < groupsNum; i++ { + testutil.Eventually(re, func() bool { + name := "test" + strconv.Itoa(i) + meta = controller.GetActiveResourceGroup(name) + return meta == nil + }, testutil.WithTickInterval(50*time.Millisecond)) + } +} - // Check watch result - watchChan, err := suite.client.WatchResourceGroup(suite.ctx, revision) - re.NoError(err) - i := 0 - for { - select { - case <-time.After(time.Second): - return - case res := <-watchChan: - for _, r := range res { - if i < 6 { - suite.Equal(uint64(10000), r.RUSettings.RU.Settings.FillRate) - } else { - suite.Equal(uint64(20000), r.RUSettings.RU.Settings.FillRate) - } - i++ - } - } +func (suite *resourceManagerClientTestSuite) TestWatchWithSingleGroupByKeyspace() { + re := suite.Require() + cli := suite.client + + // We need to disable watch stream for `isSingleGroupByKeyspace`. + re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/disableWatch", "return(true)")) + defer func() { + re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/disableWatch")) + }() + // Distinguish the controller with and without enabling `isSingleGroupByKeyspace`. + controllerKeySpace, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil, controller.EnableSingleGroupByKeyspace()) + controller, _ := controller.NewResourceGroupController(suite.ctx, 2, cli, nil) + controller.Start(suite.ctx) + controllerKeySpace.Start(suite.ctx) + defer controllerKeySpace.Stop() + defer controller.Stop() + + // Mock add resource group. + group := &rmpb.ResourceGroup{ + Name: "test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 10000, + }, + Tokens: 100000, + }, + }, } + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + + tcs := tokenConsumptionPerSecond{rruTokensAtATime: 100} + controller.OnRequestWait(suite.ctx, group.Name, tcs.makeReadRequest()) + meta := controller.GetActiveResourceGroup(group.Name) + re.Equal(meta.RUSettings.RU, group.RUSettings.RU) + + controllerKeySpace.OnRequestWait(suite.ctx, group.Name, tcs.makeReadRequest()) + metaKeySpace := controllerKeySpace.GetActiveResourceGroup(group.Name) + re.Equal(metaKeySpace.RUSettings.RU, group.RUSettings.RU) } const buffDuration = time.Millisecond * 300