Skip to content

Commit

Permalink
etcdutil, mcs: fix the issue loading label rules is too slow (tikv#7718)
Browse files Browse the repository at this point in the history
close tikv#7724

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] committed Jan 18, 2024
1 parent a90e13e commit da58fbe
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 112 deletions.
6 changes: 3 additions & 3 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ func MakeRegionBound(id uint32) *RegionBound {
}
}

// makeKeyRanges encodes keyspace ID to correct LabelRule data.
func makeKeyRanges(id uint32) []interface{} {
// MakeKeyRanges encodes keyspace ID to correct LabelRule data.
func MakeKeyRanges(id uint32) []interface{} {
regionBound := MakeRegionBound(id)
return []interface{}{
map[string]interface{}{
Expand Down Expand Up @@ -207,7 +207,7 @@ func MakeLabelRule(id uint32) *labeler.LabelRule {
},
},
RuleType: labeler.KeyRange,
Data: makeKeyRanges(id),
Data: MakeKeyRanges(id),
}
}

Expand Down
25 changes: 18 additions & 7 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (rw *Watcher) initializeRuleWatcher() error {
var suspectKeyRanges *core.KeyRanges

preEventsFn := func(events []*clientv3.Event) error {
// It will be locked until the postFn is finished.
// It will be locked until the postEventsFn is finished.
rw.ruleManager.Lock()
rw.patch = rw.ruleManager.BeginPatch()
suspectKeyRanges = &core.KeyRanges{}
Expand Down Expand Up @@ -188,7 +188,7 @@ func (rw *Watcher) initializeRuleWatcher() error {
}
postEventsFn := func(events []*clientv3.Event) error {
defer rw.ruleManager.Unlock()
if err := rw.ruleManager.TryCommitPatch(rw.patch); err != nil {
if err := rw.ruleManager.TryCommitPatchLocked(rw.patch); err != nil {
log.Error("failed to commit patch", zap.Error(err))
return err
}
Expand All @@ -212,26 +212,37 @@ func (rw *Watcher) initializeRuleWatcher() error {

func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
// TODO: use txn in region labeler.
preEventsFn := func(events []*clientv3.Event) error {
// It will be locked until the postEventsFn is finished.
rw.regionLabeler.Lock()
return nil
}
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
log.Debug("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
rule, err := labeler.NewLabelRuleFromJSON(kv.Value)
if err != nil {
return err
}
return rw.regionLabeler.SetLabelRule(rule)
return rw.regionLabeler.SetLabelRuleLocked(rule)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete region label rule", zap.String("key", key))
return rw.regionLabeler.DeleteLabelRule(strings.TrimPrefix(key, prefixToTrim))
return rw.regionLabeler.DeleteLabelRuleLocked(strings.TrimPrefix(key, prefixToTrim))
}
postEventsFn := func(events []*clientv3.Event) error {
defer rw.regionLabeler.Unlock()
rw.regionLabeler.BuildRangeListLocked()
return nil
}
rw.labelWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-region-label-watcher", rw.regionLabelPathPrefix,
func([]*clientv3.Event) error { return nil },
preEventsFn,
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
postEventsFn,
true, /* withPrefix */
)
rw.labelWatcher.StartWatchLoop()
Expand Down
113 changes: 113 additions & 0 deletions pkg/mcs/scheduling/server/rule/watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rule

import (
"context"
"encoding/json"
"os"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)

const (
clusterID = uint64(20240117)
rulesNum = 16384
)

func TestLoadLargeRules(t *testing.T) {
re := require.New(t)
ctx, client, clean := prepare(t)
defer clean()
runWatcherLoadLabelRule(ctx, re, client)
}

func BenchmarkLoadLargeRules(b *testing.B) {
re := require.New(b)
ctx, client, clean := prepare(b)
defer clean()

b.ResetTimer() // Resets the timer to ignore initialization time in the benchmark

for n := 0; n < b.N; n++ {
runWatcherLoadLabelRule(ctx, re, client)
}
}

func runWatcherLoadLabelRule(ctx context.Context, re *require.Assertions, client *clientv3.Client) {
storage := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
labelerManager, err := labeler.NewRegionLabeler(ctx, storage, time.Hour)
re.NoError(err)
ctx, cancel := context.WithCancel(ctx)
rw := &Watcher{
ctx: ctx,
cancel: cancel,
rulesPathPrefix: endpoint.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
etcdClient: client,
ruleStorage: storage,
regionLabeler: labelerManager,
}
err = rw.initializeRegionLabelWatcher()
re.NoError(err)
re.Len(labelerManager.GetAllLabelRules(), rulesNum)
cancel()
}

func prepare(t require.TestingT) (context.Context, *clientv3.Client, func()) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
cfg := etcdutil.NewTestSingleConfig()
cfg.Dir = os.TempDir() + "/test_etcd"
os.RemoveAll(cfg.Dir)
etcd, err := embed.StartEtcd(cfg)
re.NoError(err)
client, err := etcdutil.CreateEtcdClient(nil, cfg.LCUrls)
re.NoError(err)
<-etcd.Server.ReadyNotify()

for i := 1; i < rulesNum+1; i++ {
rule := &labeler.LabelRule{
ID: "test_" + strconv.Itoa(i),
Labels: []labeler.RegionLabel{{Key: "test", Value: "test"}},
RuleType: labeler.KeyRange,
Data: keyspace.MakeKeyRanges(uint32(i)),
}
value, err := json.Marshal(rule)
re.NoError(err)
key := endpoint.RegionLabelPathPrefix(clusterID) + "/" + rule.ID
_, err = clientv3.NewKV(client).Put(ctx, key, string(value))
re.NoError(err)
}

return ctx, client, func() {
cancel()
client.Close()
etcd.Close()
os.RemoveAll(cfg.Dir)
}
}
33 changes: 25 additions & 8 deletions pkg/schedule/labeler/labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (l *RegionLabeler) checkAndClearExpiredLabels() {
}
}
if deleted {
l.buildRangeList()
l.BuildRangeListLocked()
}
}

Expand Down Expand Up @@ -128,11 +128,12 @@ func (l *RegionLabeler) loadRules() error {
return err
}
}
l.buildRangeList()
l.BuildRangeListLocked()
return nil
}

func (l *RegionLabeler) buildRangeList() {
// BuildRangeListLocked builds the range list.
func (l *RegionLabeler) BuildRangeListLocked() {
builder := rangelist.NewBuilder()
l.minExpire = nil
for _, rule := range l.labelRules {
Expand Down Expand Up @@ -206,31 +207,47 @@ func (l *RegionLabeler) getAndCheckRule(id string, now time.Time) *LabelRule {

// SetLabelRule inserts or updates a LabelRule.
func (l *RegionLabeler) SetLabelRule(rule *LabelRule) error {
l.Lock()
defer l.Unlock()
if err := l.SetLabelRuleLocked(rule); err != nil {
return err
}
l.BuildRangeListLocked()
return nil
}

// SetLabelRuleLocked inserts or updates a LabelRule but not buildRangeList.
func (l *RegionLabeler) SetLabelRuleLocked(rule *LabelRule) error {
if err := rule.checkAndAdjust(); err != nil {
return err
}
l.Lock()
defer l.Unlock()
if err := l.storage.SaveRegionRule(rule.ID, rule); err != nil {
return err
}
l.labelRules[rule.ID] = rule
l.buildRangeList()
return nil
}

// DeleteLabelRule removes a LabelRule.
func (l *RegionLabeler) DeleteLabelRule(id string) error {
l.Lock()
defer l.Unlock()
if err := l.DeleteLabelRuleLocked(id); err != nil {
return err
}
l.BuildRangeListLocked()
return nil
}

// DeleteLabelRuleLocked removes a LabelRule but not buildRangeList.
func (l *RegionLabeler) DeleteLabelRuleLocked(id string) error {
if _, ok := l.labelRules[id]; !ok {
return errs.ErrRegionRuleNotFound.FastGenByArgs(id)
}
if err := l.storage.DeleteRegionRule(id); err != nil {
return err
}
delete(l.labelRules, id)
l.buildRangeList()
return nil
}

Expand Down Expand Up @@ -264,7 +281,7 @@ func (l *RegionLabeler) Patch(patch LabelRulePatch) error {
for _, rule := range patch.SetRules {
l.labelRules[rule.ID] = rule
}
l.buildRangeList()
l.BuildRangeListLocked()
return nil
}

Expand Down
22 changes: 11 additions & 11 deletions pkg/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (m *RuleManager) SetRule(rule *Rule) error {
defer m.Unlock()
p := m.BeginPatch()
p.SetRule(rule)
if err := m.TryCommitPatch(p); err != nil {
if err := m.TryCommitPatchLocked(p); err != nil {
return err
}
log.Info("placement rule updated", zap.String("rule", fmt.Sprint(rule)))
Expand All @@ -324,7 +324,7 @@ func (m *RuleManager) DeleteRule(group, id string) error {
defer m.Unlock()
p := m.BeginPatch()
p.DeleteRule(group, id)
if err := m.TryCommitPatch(p); err != nil {
if err := m.TryCommitPatchLocked(p); err != nil {
return err
}
log.Info("placement rule is removed", zap.String("group", group), zap.String("id", id))
Expand Down Expand Up @@ -469,8 +469,8 @@ func (m *RuleManager) BeginPatch() *RuleConfigPatch {
return m.ruleConfig.beginPatch()
}

// TryCommitPatch tries to commit a patch.
func (m *RuleManager) TryCommitPatch(patch *RuleConfigPatch) error {
// TryCommitPatchLocked tries to commit a patch.
func (m *RuleManager) TryCommitPatchLocked(patch *RuleConfigPatch) error {
patch.adjust()

ruleList, err := buildRuleList(patch)
Expand Down Expand Up @@ -535,7 +535,7 @@ func (m *RuleManager) SetRules(rules []*Rule) error {
}
p.SetRule(r)
}
if err := m.TryCommitPatch(p); err != nil {
if err := m.TryCommitPatchLocked(p); err != nil {
return err
}

Expand Down Expand Up @@ -598,7 +598,7 @@ func (m *RuleManager) Batch(todo []RuleOp) error {
}
}

if err := m.TryCommitPatch(patch); err != nil {
if err := m.TryCommitPatchLocked(patch); err != nil {
return err
}

Expand Down Expand Up @@ -634,7 +634,7 @@ func (m *RuleManager) SetRuleGroup(group *RuleGroup) error {
defer m.Unlock()
p := m.BeginPatch()
p.SetGroup(group)
if err := m.TryCommitPatch(p); err != nil {
if err := m.TryCommitPatchLocked(p); err != nil {
return err
}
log.Info("group config updated", zap.String("group", fmt.Sprint(group)))
Expand All @@ -647,7 +647,7 @@ func (m *RuleManager) DeleteRuleGroup(id string) error {
defer m.Unlock()
p := m.BeginPatch()
p.DeleteGroup(id)
if err := m.TryCommitPatch(p); err != nil {
if err := m.TryCommitPatchLocked(p); err != nil {
return err
}
log.Info("group config reset", zap.String("group", id))
Expand Down Expand Up @@ -737,7 +737,7 @@ func (m *RuleManager) SetAllGroupBundles(groups []GroupBundle, override bool) er
p.SetRule(r)
}
}
if err := m.TryCommitPatch(p); err != nil {
if err := m.TryCommitPatchLocked(p); err != nil {
return err
}
log.Info("full config reset", zap.String("config", fmt.Sprint(groups)))
Expand Down Expand Up @@ -768,7 +768,7 @@ func (m *RuleManager) SetGroupBundle(group GroupBundle) error {
}
p.SetRule(r)
}
if err := m.TryCommitPatch(p); err != nil {
if err := m.TryCommitPatchLocked(p); err != nil {
return err
}
log.Info("group is reset", zap.String("group", fmt.Sprint(group)))
Expand Down Expand Up @@ -800,7 +800,7 @@ func (m *RuleManager) DeleteGroupBundle(id string, regex bool) error {
p.DeleteGroup(g.ID)
}
}
if err := m.TryCommitPatch(p); err != nil {
if err := m.TryCommitPatchLocked(p); err != nil {
return err
}
log.Info("groups are removed", zap.String("id", id), zap.Bool("regexp", regex))
Expand Down
5 changes: 0 additions & 5 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,6 @@ type KeyspaceGroupManager struct {
// cfg is the TSO config
cfg ServiceConfig

// loadKeyspaceGroupsTimeout is the timeout for loading the initial keyspace group assignment.
loadKeyspaceGroupsTimeout time.Duration
loadKeyspaceGroupsBatchSize int64
loadFromEtcdMaxRetryTimes int

Expand Down Expand Up @@ -574,9 +572,6 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error {
postEventsFn,
true, /* withPrefix */
)
if kgm.loadKeyspaceGroupsTimeout > 0 {
kgm.groupWatcher.SetLoadTimeout(kgm.loadKeyspaceGroupsTimeout)
}
if kgm.loadFromEtcdMaxRetryTimes > 0 {
kgm.groupWatcher.SetLoadRetryTimes(kgm.loadFromEtcdMaxRetryTimes)
}
Expand Down
Loading

0 comments on commit da58fbe

Please sign in to comment.