Skip to content

Commit

Permalink
mcs: watch rule change with txn (tikv#7550)
Browse files Browse the repository at this point in the history
close tikv#7418

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] committed Dec 20, 2023
1 parent 59c9d04 commit cfbc9b9
Show file tree
Hide file tree
Showing 13 changed files with 711 additions and 305 deletions.
21 changes: 21 additions & 0 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,24 @@ func NewKeyRange(startKey, endKey string) KeyRange {
EndKey: []byte(endKey),
}
}

// KeyRanges is a slice of KeyRange.
type KeyRanges struct {
krs []*KeyRange
}

// Append appends a KeyRange.
func (rs *KeyRanges) Append(startKey, endKey []byte) {
rs.krs = append(rs.krs, &KeyRange{
StartKey: startKey,
EndKey: endKey,
})
}

// Ranges returns the slice of KeyRange.
func (rs *KeyRanges) Ranges() []*KeyRange {
if rs == nil {
return nil
}
return rs.krs
}
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1330,5 +1330,5 @@ func checkRegionsReplicated(c *gin.Context) {
c.String(http.StatusBadRequest, err.Error())
return
}
c.String(http.StatusOK, state)
c.IndentedJSON(http.StatusOK, state)
}
154 changes: 92 additions & 62 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/checker"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
Expand All @@ -36,6 +37,10 @@ type Watcher struct {
cancel context.CancelFunc
wg sync.WaitGroup

// ruleCommonPathPrefix:
// - Key: /pd/{cluster_id}/rule
// - Value: placement.Rule or placement.RuleGroup
ruleCommonPathPrefix string
// rulesPathPrefix:
// - Key: /pd/{cluster_id}/rules/{group_id}-{rule_id}
// - Value: placement.Rule
Expand All @@ -60,8 +65,10 @@ type Watcher struct {
regionLabeler *labeler.RegionLabeler

ruleWatcher *etcdutil.LoopWatcher
groupWatcher *etcdutil.LoopWatcher
labelWatcher *etcdutil.LoopWatcher

// patch is used to cache the placement rule changes.
patch *placement.RuleConfigPatch
}

// NewWatcher creates a new watcher to watch the Placement Rule change from PD API server.
Expand All @@ -79,6 +86,7 @@ func NewWatcher(
ctx: ctx,
cancel: cancel,
rulesPathPrefix: endpoint.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
etcdClient: etcdClient,
Expand All @@ -91,10 +99,6 @@ func NewWatcher(
if err != nil {
return nil, err
}
err = rw.initializeGroupWatcher()
if err != nil {
return nil, err
}
err = rw.initializeRegionLabelWatcher()
if err != nil {
return nil, err
Expand All @@ -103,83 +107,109 @@ func NewWatcher(
}

func (rw *Watcher) initializeRuleWatcher() error {
prefixToTrim := rw.rulesPathPrefix + "/"
var suspectKeyRanges *core.KeyRanges

preEventsFn := func(events []*clientv3.Event) error {
// It will be locked until the postFn is finished.
rw.ruleManager.Lock()
rw.patch = rw.ruleManager.BeginPatch()
suspectKeyRanges = &core.KeyRanges{}
return nil
}

putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
rule, err := placement.NewRuleFromJSON(kv.Value)
if err != nil {
return err
}
// Update the suspect key ranges in the checker.
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil {
rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey)
key := string(kv.Key)
if strings.HasPrefix(key, rw.rulesPathPrefix) {
log.Info("update placement rule", zap.String("key", key), zap.String("value", string(kv.Value)))
rule, err := placement.NewRuleFromJSON(kv.Value)
if err != nil {
return err
}
// Try to add the rule change to the patch.
if err := rw.ruleManager.AdjustRule(rule, ""); err != nil {
return err
}
rw.patch.SetRule(rule)
// Update the suspect key ranges in lock.
suspectKeyRanges.Append(rule.StartKey, rule.EndKey)
if oldRule := rw.ruleManager.GetRuleLocked(rule.GroupID, rule.ID); oldRule != nil {
suspectKeyRanges.Append(oldRule.StartKey, oldRule.EndKey)
}
return nil
} else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) {
log.Info("update placement rule group", zap.String("key", key), zap.String("value", string(kv.Value)))
ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value)
if err != nil {
return err
}
// Try to add the rule group change to the patch.
rw.patch.SetGroup(ruleGroup)
// Update the suspect key ranges
for _, rule := range rw.ruleManager.GetRulesByGroupLocked(ruleGroup.ID) {
suspectKeyRanges.Append(rule.StartKey, rule.EndKey)
}
return nil
} else {
log.Warn("unknown key when updating placement rule", zap.String("key", key))
return nil
}
return rw.ruleManager.SetRule(rule)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete placement rule", zap.String("key", key))
ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim))
if err != nil {
if strings.HasPrefix(key, rw.rulesPathPrefix) {
log.Info("delete placement rule", zap.String("key", key))
ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, rw.rulesPathPrefix+"/"))
if err != nil {
return err
}
rule, err := placement.NewRuleFromJSON([]byte(ruleJSON))
if err != nil {
return err
}
// Try to add the rule change to the patch.
rw.patch.DeleteRule(rule.GroupID, rule.ID)
// Update the suspect key ranges
suspectKeyRanges.Append(rule.StartKey, rule.EndKey)
return err
} else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) {
log.Info("delete placement rule group", zap.String("key", key))
trimmedKey := strings.TrimPrefix(key, rw.ruleGroupPathPrefix+"/")
// Try to add the rule group change to the patch.
rw.patch.DeleteGroup(trimmedKey)
// Update the suspect key ranges
for _, rule := range rw.ruleManager.GetRulesByGroupLocked(trimmedKey) {
suspectKeyRanges.Append(rule.StartKey, rule.EndKey)
}
return nil
} else {
log.Warn("unknown key when deleting placement rule", zap.String("key", key))
return nil
}
rule, err := placement.NewRuleFromJSON([]byte(ruleJSON))
if err != nil {
}
postEventsFn := func(events []*clientv3.Event) error {
defer rw.ruleManager.Unlock()
if err := rw.ruleManager.TryCommitPatch(rw.patch); err != nil {
log.Error("failed to commit patch", zap.Error(err))
return err
}
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID)
for _, kr := range suspectKeyRanges.Ranges() {
rw.checkerController.AddSuspectKeyRange(kr.StartKey, kr.EndKey)
}
return nil
}
rw.ruleWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-watcher", rw.rulesPathPrefix,
func([]*clientv3.Event) error { return nil },
"scheduling-rule-watcher", rw.ruleCommonPathPrefix,
preEventsFn,
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
postEventsFn,
clientv3.WithPrefix(),
)
rw.ruleWatcher.StartWatchLoop()
return rw.ruleWatcher.WaitLoad()
}

func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value)
if err != nil {
return err
}
// Add all rule key ranges within the group to the suspect key ranges.
for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.SetRuleGroup(ruleGroup)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete placement rule group", zap.String("key", key))
trimmedKey := strings.TrimPrefix(key, prefixToTrim)
for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.DeleteRuleGroup(trimmedKey)
}
rw.groupWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-group-watcher", rw.ruleGroupPathPrefix,
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
)
rw.groupWatcher.StartWatchLoop()
return rw.groupWatcher.WaitLoad()
}

func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
Expand Down
32 changes: 18 additions & 14 deletions pkg/schedule/placement/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,30 @@ func (c *ruleConfig) getGroup(id string) *RuleGroup {
return &RuleGroup{ID: id}
}

func (c *ruleConfig) beginPatch() *ruleConfigPatch {
return &ruleConfigPatch{
func (c *ruleConfig) beginPatch() *RuleConfigPatch {
return &RuleConfigPatch{
c: c,
mut: newRuleConfig(),
}
}

// A helper data structure to update ruleConfig.
type ruleConfigPatch struct {
// RuleConfigPatch is a helper data structure to update ruleConfig.
type RuleConfigPatch struct {
c *ruleConfig // original configuration to be updated
mut *ruleConfig // record all to-commit rules and groups
}

func (p *ruleConfigPatch) setRule(r *Rule) {
// SetRule sets a rule to the patch.
func (p *RuleConfigPatch) SetRule(r *Rule) {
p.mut.rules[r.Key()] = r
}

func (p *ruleConfigPatch) deleteRule(group, id string) {
// DeleteRule deletes a rule from the patch.
func (p *RuleConfigPatch) DeleteRule(group, id string) {
p.mut.rules[[2]string{group, id}] = nil
}

func (p *ruleConfigPatch) getGroup(id string) *RuleGroup {
func (p *RuleConfigPatch) getGroup(id string) *RuleGroup {
if g, ok := p.mut.groups[id]; ok {
return g
}
Expand All @@ -110,15 +112,17 @@ func (p *ruleConfigPatch) getGroup(id string) *RuleGroup {
return &RuleGroup{ID: id}
}

func (p *ruleConfigPatch) setGroup(g *RuleGroup) {
// SetGroup sets a group to the patch.
func (p *RuleConfigPatch) SetGroup(g *RuleGroup) {
p.mut.groups[g.ID] = g
}

func (p *ruleConfigPatch) deleteGroup(id string) {
p.setGroup(&RuleGroup{ID: id})
// DeleteGroup deletes a group from the patch.
func (p *RuleConfigPatch) DeleteGroup(id string) {
p.SetGroup(&RuleGroup{ID: id})
}

func (p *ruleConfigPatch) iterateRules(f func(*Rule)) {
func (p *RuleConfigPatch) iterateRules(f func(*Rule)) {
for _, r := range p.mut.rules {
if r != nil { // nil means delete.
f(r)
Expand All @@ -131,13 +135,13 @@ func (p *ruleConfigPatch) iterateRules(f func(*Rule)) {
}
}

func (p *ruleConfigPatch) adjust() {
func (p *RuleConfigPatch) adjust() {
// setup rule.group for `buildRuleList` use.
p.iterateRules(func(r *Rule) { r.group = p.getGroup(r.GroupID) })
}

// trim unnecessary updates. For example, remove a rule then insert the same rule.
func (p *ruleConfigPatch) trim() {
func (p *RuleConfigPatch) trim() {
for key, rule := range p.mut.rules {
if jsonEquals(rule, p.c.getRule(key)) {
delete(p.mut.rules, key)
Expand All @@ -151,7 +155,7 @@ func (p *ruleConfigPatch) trim() {
}

// merge all mutations to ruleConfig.
func (p *ruleConfigPatch) commit() {
func (p *RuleConfigPatch) commit() {
for key, rule := range p.mut.rules {
if rule == nil {
delete(p.c.rules, key)
Expand Down
40 changes: 20 additions & 20 deletions pkg/schedule/placement/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,40 @@ func TestTrim(t *testing.T) {
rc.setGroup(&RuleGroup{ID: "g2", Index: 2})

testCases := []struct {
ops func(p *ruleConfigPatch)
ops func(p *RuleConfigPatch)
mutRules map[[2]string]*Rule
mutGroups map[string]*RuleGroup
}{
{
func(p *ruleConfigPatch) {
p.setRule(&Rule{GroupID: "g1", ID: "id1", Index: 100})
p.setRule(&Rule{GroupID: "g1", ID: "id2"})
p.setGroup(&RuleGroup{ID: "g1", Index: 100})
p.setGroup(&RuleGroup{ID: "g2", Index: 2})
func(p *RuleConfigPatch) {
p.SetRule(&Rule{GroupID: "g1", ID: "id1", Index: 100})
p.SetRule(&Rule{GroupID: "g1", ID: "id2"})
p.SetGroup(&RuleGroup{ID: "g1", Index: 100})
p.SetGroup(&RuleGroup{ID: "g2", Index: 2})
},
map[[2]string]*Rule{{"g1", "id1"}: {GroupID: "g1", ID: "id1", Index: 100}},
map[string]*RuleGroup{"g1": {ID: "g1", Index: 100}},
},
{
func(p *ruleConfigPatch) {
p.deleteRule("g1", "id1")
p.deleteGroup("g2")
p.deleteRule("g3", "id3")
p.deleteGroup("g3")
func(p *RuleConfigPatch) {
p.DeleteRule("g1", "id1")
p.DeleteGroup("g2")
p.DeleteRule("g3", "id3")
p.DeleteGroup("g3")
},
map[[2]string]*Rule{{"g1", "id1"}: nil},
map[string]*RuleGroup{"g2": {ID: "g2"}},
},
{
func(p *ruleConfigPatch) {
p.setRule(&Rule{GroupID: "g1", ID: "id2", Index: 200})
p.setRule(&Rule{GroupID: "g1", ID: "id2"})
p.setRule(&Rule{GroupID: "g3", ID: "id3"})
p.deleteRule("g3", "id3")
p.setGroup(&RuleGroup{ID: "g1", Index: 100})
p.setGroup(&RuleGroup{ID: "g1", Index: 1})
p.setGroup(&RuleGroup{ID: "g3", Index: 3})
p.deleteGroup("g3")
func(p *RuleConfigPatch) {
p.SetRule(&Rule{GroupID: "g1", ID: "id2", Index: 200})
p.SetRule(&Rule{GroupID: "g1", ID: "id2"})
p.SetRule(&Rule{GroupID: "g3", ID: "id3"})
p.DeleteRule("g3", "id3")
p.SetGroup(&RuleGroup{ID: "g1", Index: 100})
p.SetGroup(&RuleGroup{ID: "g1", Index: 1})
p.SetGroup(&RuleGroup{ID: "g3", Index: 3})
p.DeleteGroup("g3")
},
map[[2]string]*Rule{},
map[string]*RuleGroup{},
Expand Down
Loading

0 comments on commit cfbc9b9

Please sign in to comment.