Skip to content

Commit

Permalink
resource_control: watch delete with prev and refine test (#7092)
Browse files Browse the repository at this point in the history
close #7095

Signed-off-by: husharp <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and nolouch committed Dec 6, 2023
1 parent 22543de commit 63f7bd2
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 135 deletions.
25 changes: 18 additions & 7 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
cfgRevision := resp.GetHeader().GetRevision()
var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event
if !c.ruConfig.isSingleGroupByKeyspace {
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
}
Expand All @@ -256,7 +257,8 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
}
case <-watchRetryTimer.C:
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)
Expand Down Expand Up @@ -315,18 +317,27 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
for _, item := range resp {
metaRevision = item.Kv.ModRevision
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
}
switch item.Type {
case meta_storagepb.Event_PUT:
if err = proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
}
if item, ok := c.groupsController.Load(group.Name); ok {
gc := item.(*groupCostController)
gc.modifyMeta(group)
}
case meta_storagepb.Event_DELETE:
if _, ok := c.groupsController.LoadAndDelete(group.Name); ok {
resourceGroupStatusGauge.DeleteLabelValues(group.Name)
if item.PrevKv != nil {
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
continue
}
if _, ok := c.groupsController.LoadAndDelete(group.Name); ok {
resourceGroupStatusGauge.DeleteLabelValues(group.Name)
}
} else {
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
log.Info("previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))
}
}
}
Expand Down
47 changes: 0 additions & 47 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ type ResourceManagerClient interface {
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error)
WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
Watch(ctx context.Context, key []byte, opts ...OpOption) (chan []*meta_storagepb.Event, error)
}
Expand Down Expand Up @@ -188,52 +187,6 @@ func (c *client) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup,
return groups, resp.Header.Revision, nil
}

// WatchResourceGroup [just for TEST] watches resource groups changes.
// It returns a stream of slices of resource groups.
// The first message in stream contains all current resource groups,
// all subsequent messages contains new events[PUT/DELETE] for all resource groups.
func (c *client) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) {
configChan, err := c.Watch(ctx, GroupSettingsPathPrefixBytes, WithRev(revision), WithPrefix())
if err != nil {
return nil, err
}
resourceGroupWatcherChan := make(chan []*rmpb.ResourceGroup)
go func() {
defer func() {
close(resourceGroupWatcherChan)
if r := recover(); r != nil {
log.Error("[pd] panic in ResourceManagerClient `WatchResourceGroups`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
return
case res, ok := <-configChan:
if !ok {
return
}
groups := make([]*rmpb.ResourceGroup, 0, len(res))
for _, item := range res {
switch item.Type {
case meta_storagepb.Event_PUT:
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal(item.Kv.Value, group); err != nil {
return
}
groups = append(groups, group)
case meta_storagepb.Event_DELETE:
continue
}
}
resourceGroupWatcherChan <- groups
}
}
}()
return resourceGroupWatcherChan, err
}

func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) {
req := &tokenRequest{
done: make(chan error, 1),
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1964,7 +1964,7 @@ func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, serve
} else {
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
log.Info("previous key-value pair has been compacted", zap.String("previous key", string(e.Kv.Key)))
log.Info("previous key-value pair has been compacted", zap.String("required-key", string(e.Kv.Key)))
}
}
}
Expand Down
166 changes: 86 additions & 80 deletions tests/integrations/mcs/resource_manager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,51 +191,25 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() {
},
},
}
// Mock get revision by listing
for i := 0; i < 3; i++ {
group.Name += strconv.Itoa(i)
resp, err := cli.AddResourceGroup(suite.ctx, group)
group.Name = "test"
re.NoError(err)
re.Contains(resp, "Success!")
}
lresp, revision, err := cli.LoadResourceGroups(suite.ctx)
re.NoError(err)
re.Equal(len(lresp), 4)
re.Greater(revision, int64(0))
tcs := tokenConsumptionPerSecond{rruTokensAtATime: 100}
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/disableWatch", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/disableWatch"))
}()
controllerKeySpace, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil, controller.EnableSingleGroupByKeyspace())
controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil)
controller.Start(suite.ctx)
defer controller.Stop()
controller.OnRequestWait(suite.ctx, "test0", tcs.makeReadRequest())
meta := controller.GetActiveResourceGroup("test0")
metaShadow, err := controller.GetResourceGroup("test0")
re.NoError(err)
re.Equal(meta.RUSettings.RU, group.RUSettings.RU)
re.Equal(metaShadow.RUSettings.RU, group.RUSettings.RU)

controllerKeySpace.OnRequestWait(suite.ctx, "test0", tcs.makeReadRequest())
metaKeySpace := controllerKeySpace.GetActiveResourceGroup("test0")
re.Equal(metaKeySpace.RUSettings.RU, group.RUSettings.RU)

controller.OnRequestWait(suite.ctx, "test1", tcs.makeReadRequest())
meta = controller.GetActiveResourceGroup("test1")
metaShadow, err = controller.GetResourceGroup("test1")
re.NoError(err)
re.Equal(meta.RUSettings.RU, group.RUSettings.RU)
re.Equal(metaShadow.RUSettings.RU, group.RUSettings.RU)
suite.NoError(err)
// Mock add resource groups
for i := 3; i < 9; i++ {
var meta *rmpb.ResourceGroup
groupsNum := 10
for i := 0; i < groupsNum; i++ {
group.Name = "test" + strconv.Itoa(i)
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")

// Make sure the resource group active
meta, err = controller.GetResourceGroup(group.Name)
re.NotNil(meta)
re.NoError(err)
meta = controller.GetActiveResourceGroup(group.Name)
re.NotNil(meta)
}
// Mock modify resource groups
modifySettings := func(gs *rmpb.ResourceGroup) {
Expand All @@ -247,65 +221,97 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() {
},
}
}
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/watchStreamError", "return(true)"))
for i := 0; i < 2; i++ {
if i == 1 {
testutil.Eventually(re, func() bool {
meta = controller.GetActiveResourceGroup("test0")
return meta.RUSettings.RU.Settings.FillRate == uint64(20000)
}, testutil.WithTickInterval(50*time.Millisecond))
metaKeySpace = controllerKeySpace.GetActiveResourceGroup("test0")
re.Equal(metaKeySpace.RUSettings.RU.Settings.FillRate, uint64(10000))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/watchStreamError", "return(true)"))
}
for i := 0; i < groupsNum; i++ {
group.Name = "test" + strconv.Itoa(i)
modifySettings(group)
resp, err := cli.ModifyResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
}
time.Sleep(time.Millisecond * 50)
meta = controller.GetActiveResourceGroup("test1")
re.Equal(meta.RUSettings.RU.Settings.FillRate, uint64(10000))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/watchStreamError"))
for i := 0; i < groupsNum; i++ {
testutil.Eventually(re, func() bool {
name := "test" + strconv.Itoa(i)
meta = controller.GetActiveResourceGroup(name)
if meta != nil {
return meta.RUSettings.RU.Settings.FillRate == uint64(20000)
}
return false
}, testutil.WithTickInterval(50*time.Millisecond))
}

// Mock reset watch stream
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/watchStreamError", "return(true)"))
group.Name = "test" + strconv.Itoa(groupsNum)
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
// Make sure the resource group active
meta, err = controller.GetResourceGroup(group.Name)
re.NotNil(meta)
re.NoError(err)
modifySettings(group)
resp, err = cli.ModifyResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
testutil.Eventually(re, func() bool {
meta = controller.GetActiveResourceGroup("test1")
meta = controller.GetActiveResourceGroup(group.Name)
return meta.RUSettings.RU.Settings.FillRate == uint64(20000)
}, testutil.WithTickInterval(100*time.Millisecond))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/watchStreamError"))

for i := 2; i < 9; i++ {
group.Name = "test" + strconv.Itoa(i)
modifySettings(group)
resp, err := cli.ModifyResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")
}
// Mock delete resource groups
suite.cleanupResourceGroups()
time.Sleep(time.Second)
meta = controller.GetActiveResourceGroup(group.Name)
re.Nil(meta)
for i := 0; i < groupsNum; i++ {
testutil.Eventually(re, func() bool {
name := "test" + strconv.Itoa(i)
meta = controller.GetActiveResourceGroup(name)
return meta == nil
}, testutil.WithTickInterval(50*time.Millisecond))
}
}

// Check watch result
watchChan, err := suite.client.WatchResourceGroup(suite.ctx, revision)
re.NoError(err)
i := 0
for {
select {
case <-time.After(time.Second):
return
case res := <-watchChan:
for _, r := range res {
if i < 6 {
suite.Equal(uint64(10000), r.RUSettings.RU.Settings.FillRate)
} else {
suite.Equal(uint64(20000), r.RUSettings.RU.Settings.FillRate)
}
i++
}
}
func (suite *resourceManagerClientTestSuite) TestWatchWithSingleGroupByKeyspace() {
re := suite.Require()
cli := suite.client

// We need to disable watch stream for `isSingleGroupByKeyspace`.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/disableWatch", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/disableWatch"))
}()
// Distinguish the controller with and without enabling `isSingleGroupByKeyspace`.
controllerKeySpace, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil, controller.EnableSingleGroupByKeyspace())
controller, _ := controller.NewResourceGroupController(suite.ctx, 2, cli, nil)
controller.Start(suite.ctx)
controllerKeySpace.Start(suite.ctx)
defer controllerKeySpace.Stop()
defer controller.Stop()

// Mock add resource group.
group := &rmpb.ResourceGroup{
Name: "test",
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 10000,
},
Tokens: 100000,
},
},
}
resp, err := cli.AddResourceGroup(suite.ctx, group)
re.NoError(err)
re.Contains(resp, "Success!")

tcs := tokenConsumptionPerSecond{rruTokensAtATime: 100}
controller.OnRequestWait(suite.ctx, group.Name, tcs.makeReadRequest())
meta := controller.GetActiveResourceGroup(group.Name)
re.Equal(meta.RUSettings.RU, group.RUSettings.RU)

controllerKeySpace.OnRequestWait(suite.ctx, group.Name, tcs.makeReadRequest())
metaKeySpace := controllerKeySpace.GetActiveResourceGroup(group.Name)
re.Equal(metaKeySpace.RUSettings.RU, group.RUSettings.RU)
}

const buffDuration = time.Millisecond * 300
Expand Down

0 comments on commit 63f7bd2

Please sign in to comment.