From cfbc9b96cdc23d4f7c723b0b4130f6612f9ed00a Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 20 Dec 2023 12:01:52 +0800 Subject: [PATCH] mcs: watch rule change with txn (#7550) close tikv/pd#7418 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/core/basic_cluster.go | 21 ++ pkg/mcs/scheduling/server/apis/v1/api.go | 2 +- pkg/mcs/scheduling/server/rule/watcher.go | 154 +++++---- pkg/schedule/placement/config.go | 32 +- pkg/schedule/placement/config_test.go | 40 +-- pkg/schedule/placement/rule_manager.go | 254 +++++++------- pkg/schedule/placement/rule_manager_test.go | 14 +- pkg/storage/endpoint/rule.go | 63 ++-- pkg/tso/keyspace_group_manager.go | 4 +- pkg/utils/etcdutil/etcdutil.go | 24 +- server/keyspace_service.go | 7 +- tests/server/api/region_test.go | 56 +++- tests/server/api/rule_test.go | 345 ++++++++++++++++++-- 13 files changed, 711 insertions(+), 305 deletions(-) diff --git a/pkg/core/basic_cluster.go b/pkg/core/basic_cluster.go index 2258a816324..d70b620db3b 100644 --- a/pkg/core/basic_cluster.go +++ b/pkg/core/basic_cluster.go @@ -309,3 +309,24 @@ func NewKeyRange(startKey, endKey string) KeyRange { EndKey: []byte(endKey), } } + +// KeyRanges is a slice of KeyRange. +type KeyRanges struct { + krs []*KeyRange +} + +// Append appends a KeyRange. +func (rs *KeyRanges) Append(startKey, endKey []byte) { + rs.krs = append(rs.krs, &KeyRange{ + StartKey: startKey, + EndKey: endKey, + }) +} + +// Ranges returns the slice of KeyRange. +func (rs *KeyRanges) Ranges() []*KeyRange { + if rs == nil { + return nil + } + return rs.krs +} diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index b59780b7a61..e6881f2f85c 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -1330,5 +1330,5 @@ func checkRegionsReplicated(c *gin.Context) { c.String(http.StatusBadRequest, err.Error()) return } - c.String(http.StatusOK, state) + c.IndentedJSON(http.StatusOK, state) } diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 96e19cf5002..3e11cf9ff9d 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/checker" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" @@ -36,6 +37,10 @@ type Watcher struct { cancel context.CancelFunc wg sync.WaitGroup + // ruleCommonPathPrefix: + // - Key: /pd/{cluster_id}/rule + // - Value: placement.Rule or placement.RuleGroup + ruleCommonPathPrefix string // rulesPathPrefix: // - Key: /pd/{cluster_id}/rules/{group_id}-{rule_id} // - Value: placement.Rule @@ -60,8 +65,10 @@ type Watcher struct { regionLabeler *labeler.RegionLabeler ruleWatcher *etcdutil.LoopWatcher - groupWatcher *etcdutil.LoopWatcher labelWatcher *etcdutil.LoopWatcher + + // patch is used to cache the placement rule changes. + patch *placement.RuleConfigPatch } // NewWatcher creates a new watcher to watch the Placement Rule change from PD API server. @@ -79,6 +86,7 @@ func NewWatcher( ctx: ctx, cancel: cancel, rulesPathPrefix: endpoint.RulesPathPrefix(clusterID), + ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID), ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID), regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID), etcdClient: etcdClient, @@ -91,10 +99,6 @@ func NewWatcher( if err != nil { return nil, err } - err = rw.initializeGroupWatcher() - if err != nil { - return nil, err - } err = rw.initializeRegionLabelWatcher() if err != nil { return nil, err @@ -103,83 +107,109 @@ func NewWatcher( } func (rw *Watcher) initializeRuleWatcher() error { - prefixToTrim := rw.rulesPathPrefix + "/" + var suspectKeyRanges *core.KeyRanges + + preEventsFn := func(events []*clientv3.Event) error { + // It will be locked until the postFn is finished. + rw.ruleManager.Lock() + rw.patch = rw.ruleManager.BeginPatch() + suspectKeyRanges = &core.KeyRanges{} + return nil + } + putFn := func(kv *mvccpb.KeyValue) error { - log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) - rule, err := placement.NewRuleFromJSON(kv.Value) - if err != nil { - return err - } - // Update the suspect key ranges in the checker. - rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil { - rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey) + key := string(kv.Key) + if strings.HasPrefix(key, rw.rulesPathPrefix) { + log.Info("update placement rule", zap.String("key", key), zap.String("value", string(kv.Value))) + rule, err := placement.NewRuleFromJSON(kv.Value) + if err != nil { + return err + } + // Try to add the rule change to the patch. + if err := rw.ruleManager.AdjustRule(rule, ""); err != nil { + return err + } + rw.patch.SetRule(rule) + // Update the suspect key ranges in lock. + suspectKeyRanges.Append(rule.StartKey, rule.EndKey) + if oldRule := rw.ruleManager.GetRuleLocked(rule.GroupID, rule.ID); oldRule != nil { + suspectKeyRanges.Append(oldRule.StartKey, oldRule.EndKey) + } + return nil + } else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) { + log.Info("update placement rule group", zap.String("key", key), zap.String("value", string(kv.Value))) + ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value) + if err != nil { + return err + } + // Try to add the rule group change to the patch. + rw.patch.SetGroup(ruleGroup) + // Update the suspect key ranges + for _, rule := range rw.ruleManager.GetRulesByGroupLocked(ruleGroup.ID) { + suspectKeyRanges.Append(rule.StartKey, rule.EndKey) + } + return nil + } else { + log.Warn("unknown key when updating placement rule", zap.String("key", key)) + return nil } - return rw.ruleManager.SetRule(rule) } deleteFn := func(kv *mvccpb.KeyValue) error { key := string(kv.Key) - log.Info("delete placement rule", zap.String("key", key)) - ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim)) - if err != nil { + if strings.HasPrefix(key, rw.rulesPathPrefix) { + log.Info("delete placement rule", zap.String("key", key)) + ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, rw.rulesPathPrefix+"/")) + if err != nil { + return err + } + rule, err := placement.NewRuleFromJSON([]byte(ruleJSON)) + if err != nil { + return err + } + // Try to add the rule change to the patch. + rw.patch.DeleteRule(rule.GroupID, rule.ID) + // Update the suspect key ranges + suspectKeyRanges.Append(rule.StartKey, rule.EndKey) return err + } else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) { + log.Info("delete placement rule group", zap.String("key", key)) + trimmedKey := strings.TrimPrefix(key, rw.ruleGroupPathPrefix+"/") + // Try to add the rule group change to the patch. + rw.patch.DeleteGroup(trimmedKey) + // Update the suspect key ranges + for _, rule := range rw.ruleManager.GetRulesByGroupLocked(trimmedKey) { + suspectKeyRanges.Append(rule.StartKey, rule.EndKey) + } + return nil + } else { + log.Warn("unknown key when deleting placement rule", zap.String("key", key)) + return nil } - rule, err := placement.NewRuleFromJSON([]byte(ruleJSON)) - if err != nil { + } + postEventsFn := func(events []*clientv3.Event) error { + defer rw.ruleManager.Unlock() + if err := rw.ruleManager.TryCommitPatch(rw.patch); err != nil { + log.Error("failed to commit patch", zap.Error(err)) return err } - rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID) + for _, kr := range suspectKeyRanges.Ranges() { + rw.checkerController.AddSuspectKeyRange(kr.StartKey, kr.EndKey) + } + return nil } rw.ruleWatcher = etcdutil.NewLoopWatcher( rw.ctx, &rw.wg, rw.etcdClient, - "scheduling-rule-watcher", rw.rulesPathPrefix, - func([]*clientv3.Event) error { return nil }, + "scheduling-rule-watcher", rw.ruleCommonPathPrefix, + preEventsFn, putFn, deleteFn, - func([]*clientv3.Event) error { return nil }, + postEventsFn, clientv3.WithPrefix(), ) rw.ruleWatcher.StartWatchLoop() return rw.ruleWatcher.WaitLoad() } -func (rw *Watcher) initializeGroupWatcher() error { - prefixToTrim := rw.ruleGroupPathPrefix + "/" - putFn := func(kv *mvccpb.KeyValue) error { - log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) - ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value) - if err != nil { - return err - } - // Add all rule key ranges within the group to the suspect key ranges. - for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) { - rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - } - return rw.ruleManager.SetRuleGroup(ruleGroup) - } - deleteFn := func(kv *mvccpb.KeyValue) error { - key := string(kv.Key) - log.Info("delete placement rule group", zap.String("key", key)) - trimmedKey := strings.TrimPrefix(key, prefixToTrim) - for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) { - rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - } - return rw.ruleManager.DeleteRuleGroup(trimmedKey) - } - rw.groupWatcher = etcdutil.NewLoopWatcher( - rw.ctx, &rw.wg, - rw.etcdClient, - "scheduling-rule-group-watcher", rw.ruleGroupPathPrefix, - func([]*clientv3.Event) error { return nil }, - putFn, deleteFn, - func([]*clientv3.Event) error { return nil }, - clientv3.WithPrefix(), - ) - rw.groupWatcher.StartWatchLoop() - return rw.groupWatcher.WaitLoad() -} - func (rw *Watcher) initializeRegionLabelWatcher() error { prefixToTrim := rw.regionLabelPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { diff --git a/pkg/schedule/placement/config.go b/pkg/schedule/placement/config.go index 878db4b2e0a..00c0f94b94e 100644 --- a/pkg/schedule/placement/config.go +++ b/pkg/schedule/placement/config.go @@ -79,28 +79,30 @@ func (c *ruleConfig) getGroup(id string) *RuleGroup { return &RuleGroup{ID: id} } -func (c *ruleConfig) beginPatch() *ruleConfigPatch { - return &ruleConfigPatch{ +func (c *ruleConfig) beginPatch() *RuleConfigPatch { + return &RuleConfigPatch{ c: c, mut: newRuleConfig(), } } -// A helper data structure to update ruleConfig. -type ruleConfigPatch struct { +// RuleConfigPatch is a helper data structure to update ruleConfig. +type RuleConfigPatch struct { c *ruleConfig // original configuration to be updated mut *ruleConfig // record all to-commit rules and groups } -func (p *ruleConfigPatch) setRule(r *Rule) { +// SetRule sets a rule to the patch. +func (p *RuleConfigPatch) SetRule(r *Rule) { p.mut.rules[r.Key()] = r } -func (p *ruleConfigPatch) deleteRule(group, id string) { +// DeleteRule deletes a rule from the patch. +func (p *RuleConfigPatch) DeleteRule(group, id string) { p.mut.rules[[2]string{group, id}] = nil } -func (p *ruleConfigPatch) getGroup(id string) *RuleGroup { +func (p *RuleConfigPatch) getGroup(id string) *RuleGroup { if g, ok := p.mut.groups[id]; ok { return g } @@ -110,15 +112,17 @@ func (p *ruleConfigPatch) getGroup(id string) *RuleGroup { return &RuleGroup{ID: id} } -func (p *ruleConfigPatch) setGroup(g *RuleGroup) { +// SetGroup sets a group to the patch. +func (p *RuleConfigPatch) SetGroup(g *RuleGroup) { p.mut.groups[g.ID] = g } -func (p *ruleConfigPatch) deleteGroup(id string) { - p.setGroup(&RuleGroup{ID: id}) +// DeleteGroup deletes a group from the patch. +func (p *RuleConfigPatch) DeleteGroup(id string) { + p.SetGroup(&RuleGroup{ID: id}) } -func (p *ruleConfigPatch) iterateRules(f func(*Rule)) { +func (p *RuleConfigPatch) iterateRules(f func(*Rule)) { for _, r := range p.mut.rules { if r != nil { // nil means delete. f(r) @@ -131,13 +135,13 @@ func (p *ruleConfigPatch) iterateRules(f func(*Rule)) { } } -func (p *ruleConfigPatch) adjust() { +func (p *RuleConfigPatch) adjust() { // setup rule.group for `buildRuleList` use. p.iterateRules(func(r *Rule) { r.group = p.getGroup(r.GroupID) }) } // trim unnecessary updates. For example, remove a rule then insert the same rule. -func (p *ruleConfigPatch) trim() { +func (p *RuleConfigPatch) trim() { for key, rule := range p.mut.rules { if jsonEquals(rule, p.c.getRule(key)) { delete(p.mut.rules, key) @@ -151,7 +155,7 @@ func (p *ruleConfigPatch) trim() { } // merge all mutations to ruleConfig. -func (p *ruleConfigPatch) commit() { +func (p *RuleConfigPatch) commit() { for key, rule := range p.mut.rules { if rule == nil { delete(p.c.rules, key) diff --git a/pkg/schedule/placement/config_test.go b/pkg/schedule/placement/config_test.go index 8f7161a56d7..ccee8837331 100644 --- a/pkg/schedule/placement/config_test.go +++ b/pkg/schedule/placement/config_test.go @@ -30,40 +30,40 @@ func TestTrim(t *testing.T) { rc.setGroup(&RuleGroup{ID: "g2", Index: 2}) testCases := []struct { - ops func(p *ruleConfigPatch) + ops func(p *RuleConfigPatch) mutRules map[[2]string]*Rule mutGroups map[string]*RuleGroup }{ { - func(p *ruleConfigPatch) { - p.setRule(&Rule{GroupID: "g1", ID: "id1", Index: 100}) - p.setRule(&Rule{GroupID: "g1", ID: "id2"}) - p.setGroup(&RuleGroup{ID: "g1", Index: 100}) - p.setGroup(&RuleGroup{ID: "g2", Index: 2}) + func(p *RuleConfigPatch) { + p.SetRule(&Rule{GroupID: "g1", ID: "id1", Index: 100}) + p.SetRule(&Rule{GroupID: "g1", ID: "id2"}) + p.SetGroup(&RuleGroup{ID: "g1", Index: 100}) + p.SetGroup(&RuleGroup{ID: "g2", Index: 2}) }, map[[2]string]*Rule{{"g1", "id1"}: {GroupID: "g1", ID: "id1", Index: 100}}, map[string]*RuleGroup{"g1": {ID: "g1", Index: 100}}, }, { - func(p *ruleConfigPatch) { - p.deleteRule("g1", "id1") - p.deleteGroup("g2") - p.deleteRule("g3", "id3") - p.deleteGroup("g3") + func(p *RuleConfigPatch) { + p.DeleteRule("g1", "id1") + p.DeleteGroup("g2") + p.DeleteRule("g3", "id3") + p.DeleteGroup("g3") }, map[[2]string]*Rule{{"g1", "id1"}: nil}, map[string]*RuleGroup{"g2": {ID: "g2"}}, }, { - func(p *ruleConfigPatch) { - p.setRule(&Rule{GroupID: "g1", ID: "id2", Index: 200}) - p.setRule(&Rule{GroupID: "g1", ID: "id2"}) - p.setRule(&Rule{GroupID: "g3", ID: "id3"}) - p.deleteRule("g3", "id3") - p.setGroup(&RuleGroup{ID: "g1", Index: 100}) - p.setGroup(&RuleGroup{ID: "g1", Index: 1}) - p.setGroup(&RuleGroup{ID: "g3", Index: 3}) - p.deleteGroup("g3") + func(p *RuleConfigPatch) { + p.SetRule(&Rule{GroupID: "g1", ID: "id2", Index: 200}) + p.SetRule(&Rule{GroupID: "g1", ID: "id2"}) + p.SetRule(&Rule{GroupID: "g3", ID: "id3"}) + p.DeleteRule("g3", "id3") + p.SetGroup(&RuleGroup{ID: "g1", Index: 100}) + p.SetGroup(&RuleGroup{ID: "g1", Index: 1}) + p.SetGroup(&RuleGroup{ID: "g3", Index: 3}) + p.DeleteGroup("g3") }, map[[2]string]*Rule{}, map[string]*RuleGroup{}, diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index 621c52d738e..ea85911462b 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -33,6 +33,7 @@ import ( "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -128,12 +129,17 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolat IsolationLevel: isolationLevel, }) } - for _, defaultRule := range defaultRules { - if err := m.storage.SaveRule(defaultRule.StoreKey(), defaultRule); err != nil { - // TODO: Need to delete the previously successfully saved Rules? - return err + if err := m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + for _, defaultRule := range defaultRules { + if err := m.storage.SaveRule(txn, defaultRule.StoreKey(), defaultRule); err != nil { + // TODO: Need to delete the previously successfully saved Rules? + return err + } + m.ruleConfig.setRule(defaultRule) } - m.ruleConfig.setRule(defaultRule) + return nil + }); err != nil { + return err } } m.ruleConfig.adjust() @@ -151,61 +157,66 @@ func (m *RuleManager) loadRules() error { toSave []*Rule toDelete []string ) - err := m.storage.LoadRules(func(k, v string) { - r, err := NewRuleFromJSON([]byte(v)) - if err != nil { - log.Error("failed to unmarshal rule value", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) - toDelete = append(toDelete, k) - return - } - err = m.adjustRule(r, "") + return m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + err = m.storage.LoadRules(txn, func(k, v string) { + r, err := NewRuleFromJSON([]byte(v)) + if err != nil { + log.Error("failed to unmarshal rule value", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) + toDelete = append(toDelete, k) + return + } + err = m.AdjustRule(r, "") + if err != nil { + log.Error("rule is in bad format", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule, err)) + toDelete = append(toDelete, k) + return + } + _, ok := m.ruleConfig.rules[r.Key()] + if ok { + log.Error("duplicated rule key", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) + toDelete = append(toDelete, k) + return + } + if k != r.StoreKey() { + log.Error("mismatch data key, need to restore", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) + toDelete = append(toDelete, k) + toSave = append(toSave, r) + } + m.ruleConfig.rules[r.Key()] = r + }) if err != nil { - log.Error("rule is in bad format", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule, err)) - toDelete = append(toDelete, k) - return + return err } - _, ok := m.ruleConfig.rules[r.Key()] - if ok { - log.Error("duplicated rule key", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) - toDelete = append(toDelete, k) - return + + for _, s := range toSave { + if err = m.storage.SaveRule(txn, s.StoreKey(), s); err != nil { + return err + } } - if k != r.StoreKey() { - log.Error("mismatch data key, need to restore", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) - toDelete = append(toDelete, k) - toSave = append(toSave, r) + for _, d := range toDelete { + if err = m.storage.DeleteRule(txn, d); err != nil { + return err + } } - m.ruleConfig.rules[r.Key()] = r + return nil }) - if err != nil { - return err - } - for _, s := range toSave { - if err = m.storage.SaveRule(s.StoreKey(), s); err != nil { - return err - } - } - for _, d := range toDelete { - if err = m.storage.DeleteRule(d); err != nil { - return err - } - } - return nil } func (m *RuleManager) loadGroups() error { - return m.storage.LoadRuleGroups(func(k, v string) { - g, err := NewRuleGroupFromJSON([]byte(v)) - if err != nil { - log.Error("failed to unmarshal rule group", zap.String("group-id", k), errs.ZapError(errs.ErrLoadRuleGroup, err)) - return - } - m.ruleConfig.groups[g.ID] = g + return m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + return m.storage.LoadRuleGroups(txn, func(k, v string) { + g, err := NewRuleGroupFromJSON([]byte(v)) + if err != nil { + log.Error("failed to unmarshal rule group", zap.String("group-id", k), errs.ZapError(errs.ErrLoadRuleGroup, err)) + return + } + m.ruleConfig.groups[g.ID] = g + }) }) } -// check and adjust rule from client or storage. -func (m *RuleManager) adjustRule(r *Rule, groupID string) (err error) { +// AdjustRule check and adjust rule from client or storage. +func (m *RuleManager) AdjustRule(r *Rule, groupID string) (err error) { r.StartKey, err = hex.DecodeString(r.StartKeyHex) if err != nil { return errs.ErrHexDecodingString.FastGenByArgs(r.StartKeyHex) @@ -279,6 +290,11 @@ func (m *RuleManager) adjustRule(r *Rule, groupID string) (err error) { func (m *RuleManager) GetRule(group, id string) *Rule { m.RLock() defer m.RUnlock() + return m.GetRuleLocked(group, id) +} + +// GetRuleLocked returns the Rule with the same (group, id). +func (m *RuleManager) GetRuleLocked(group, id string) *Rule { if r := m.ruleConfig.getRule([2]string{group, id}); r != nil { return r.Clone() } @@ -287,14 +303,14 @@ func (m *RuleManager) GetRule(group, id string) *Rule { // SetRule inserts or updates a Rule. func (m *RuleManager) SetRule(rule *Rule) error { - if err := m.adjustRule(rule, ""); err != nil { + if err := m.AdjustRule(rule, ""); err != nil { return err } m.Lock() defer m.Unlock() - p := m.beginPatch() - p.setRule(rule) - if err := m.tryCommitPatch(p); err != nil { + p := m.BeginPatch() + p.SetRule(rule) + if err := m.TryCommitPatch(p); err != nil { return err } log.Info("placement rule updated", zap.String("rule", fmt.Sprint(rule))) @@ -305,9 +321,9 @@ func (m *RuleManager) SetRule(rule *Rule) error { func (m *RuleManager) DeleteRule(group, id string) error { m.Lock() defer m.Unlock() - p := m.beginPatch() - p.deleteRule(group, id) - if err := m.tryCommitPatch(p); err != nil { + p := m.BeginPatch() + p.DeleteRule(group, id) + if err := m.TryCommitPatch(p); err != nil { return err } log.Info("placement rule is removed", zap.String("group", group), zap.String("id", id)) @@ -351,6 +367,11 @@ func (m *RuleManager) GetGroupsCount() int { func (m *RuleManager) GetRulesByGroup(group string) []*Rule { m.RLock() defer m.RUnlock() + return m.GetRulesByGroupLocked(group) +} + +// GetRulesByGroupLocked returns sorted rules of a group. +func (m *RuleManager) GetRulesByGroupLocked(group string) []*Rule { var rules []*Rule for _, r := range m.ruleConfig.rules { if r.GroupID == group { @@ -442,11 +463,13 @@ func (m *RuleManager) CheckIsCachedDirectly(regionID uint64) bool { return ok } -func (m *RuleManager) beginPatch() *ruleConfigPatch { +// BeginPatch returns a patch for multiple changes. +func (m *RuleManager) BeginPatch() *RuleConfigPatch { return m.ruleConfig.beginPatch() } -func (m *RuleManager) tryCommitPatch(patch *ruleConfigPatch) error { +// TryCommitPatch tries to commit a patch. +func (m *RuleManager) TryCommitPatch(patch *RuleConfigPatch) error { patch.adjust() ruleList, err := buildRuleList(patch) @@ -469,49 +492,44 @@ func (m *RuleManager) tryCommitPatch(patch *ruleConfigPatch) error { } func (m *RuleManager) savePatch(p *ruleConfig) error { - // TODO: it is not completely safe - // 1. in case that half of rules applied, error.. we have to cancel persisted rules - // but that may fail too, causing memory/disk inconsistency - // either rely a transaction API, or clients to request again until success - // 2. in case that PD is suddenly down in the loop, inconsistency again - // now we can only rely clients to request again - var err error - for key, r := range p.rules { - if r == nil { - r = &Rule{GroupID: key[0], ID: key[1]} - err = m.storage.DeleteRule(r.StoreKey()) - } else { - err = m.storage.SaveRule(r.StoreKey(), r) - } - if err != nil { - return err - } - } - for id, g := range p.groups { - if g.isDefault() { - err = m.storage.DeleteRuleGroup(id) - } else { - err = m.storage.SaveRuleGroup(id, g) + return m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + for key, r := range p.rules { + if r == nil { + r = &Rule{GroupID: key[0], ID: key[1]} + err = m.storage.DeleteRule(txn, r.StoreKey()) + } else { + err = m.storage.SaveRule(txn, r.StoreKey(), r) + } + if err != nil { + return err + } } - if err != nil { - return err + for id, g := range p.groups { + if g.isDefault() { + err = m.storage.DeleteRuleGroup(txn, id) + } else { + err = m.storage.SaveRuleGroup(txn, id, g) + } + if err != nil { + return err + } } - } - return nil + return nil + }) } // SetRules inserts or updates lots of Rules at once. func (m *RuleManager) SetRules(rules []*Rule) error { m.Lock() defer m.Unlock() - p := m.beginPatch() + p := m.BeginPatch() for _, r := range rules { - if err := m.adjustRule(r, ""); err != nil { + if err := m.AdjustRule(r, ""); err != nil { return err } - p.setRule(r) + p.SetRule(r) } - if err := m.tryCommitPatch(p); err != nil { + if err := m.TryCommitPatch(p); err != nil { return err } @@ -546,7 +564,7 @@ func (r RuleOp) String() string { func (m *RuleManager) Batch(todo []RuleOp) error { for _, t := range todo { if t.Action == RuleOpAdd { - err := m.adjustRule(t.Rule, "") + err := m.AdjustRule(t.Rule, "") if err != nil { return err } @@ -556,25 +574,25 @@ func (m *RuleManager) Batch(todo []RuleOp) error { m.Lock() defer m.Unlock() - patch := m.beginPatch() + patch := m.BeginPatch() for _, t := range todo { switch t.Action { case RuleOpAdd: - patch.setRule(t.Rule) + patch.SetRule(t.Rule) case RuleOpDel: if !t.DeleteByIDPrefix { - patch.deleteRule(t.GroupID, t.ID) + patch.DeleteRule(t.GroupID, t.ID) } else { m.ruleConfig.iterateRules(func(r *Rule) { if r.GroupID == t.GroupID && strings.HasPrefix(r.ID, t.ID) { - patch.deleteRule(r.GroupID, r.ID) + patch.DeleteRule(r.GroupID, r.ID) } }) } } } - if err := m.tryCommitPatch(patch); err != nil { + if err := m.TryCommitPatch(patch); err != nil { return err } @@ -608,9 +626,9 @@ func (m *RuleManager) GetRuleGroups() []*RuleGroup { func (m *RuleManager) SetRuleGroup(group *RuleGroup) error { m.Lock() defer m.Unlock() - p := m.beginPatch() - p.setGroup(group) - if err := m.tryCommitPatch(p); err != nil { + p := m.BeginPatch() + p.SetGroup(group) + if err := m.TryCommitPatch(p); err != nil { return err } log.Info("group config updated", zap.String("group", fmt.Sprint(group))) @@ -621,9 +639,9 @@ func (m *RuleManager) SetRuleGroup(group *RuleGroup) error { func (m *RuleManager) DeleteRuleGroup(id string) error { m.Lock() defer m.Unlock() - p := m.beginPatch() - p.deleteGroup(id) - if err := m.tryCommitPatch(p); err != nil { + p := m.BeginPatch() + p.DeleteGroup(id) + if err := m.TryCommitPatch(p); err != nil { return err } log.Info("group config reset", zap.String("group", id)) @@ -681,7 +699,7 @@ func (m *RuleManager) GetGroupBundle(id string) (b GroupBundle) { func (m *RuleManager) SetAllGroupBundles(groups []GroupBundle, override bool) error { m.Lock() defer m.Unlock() - p := m.beginPatch() + p := m.BeginPatch() matchID := func(a string) bool { for _, g := range groups { if g.ID == a { @@ -692,28 +710,28 @@ func (m *RuleManager) SetAllGroupBundles(groups []GroupBundle, override bool) er } for k := range m.ruleConfig.rules { if override || matchID(k[0]) { - p.deleteRule(k[0], k[1]) + p.DeleteRule(k[0], k[1]) } } for id := range m.ruleConfig.groups { if override || matchID(id) { - p.deleteGroup(id) + p.DeleteGroup(id) } } for _, g := range groups { - p.setGroup(&RuleGroup{ + p.SetGroup(&RuleGroup{ ID: g.ID, Index: g.Index, Override: g.Override, }) for _, r := range g.Rules { - if err := m.adjustRule(r, g.ID); err != nil { + if err := m.AdjustRule(r, g.ID); err != nil { return err } - p.setRule(r) + p.SetRule(r) } } - if err := m.tryCommitPatch(p); err != nil { + if err := m.TryCommitPatch(p); err != nil { return err } log.Info("full config reset", zap.String("config", fmt.Sprint(groups))) @@ -725,26 +743,26 @@ func (m *RuleManager) SetAllGroupBundles(groups []GroupBundle, override bool) er func (m *RuleManager) SetGroupBundle(group GroupBundle) error { m.Lock() defer m.Unlock() - p := m.beginPatch() + p := m.BeginPatch() if _, ok := m.ruleConfig.groups[group.ID]; ok { for k := range m.ruleConfig.rules { if k[0] == group.ID { - p.deleteRule(k[0], k[1]) + p.DeleteRule(k[0], k[1]) } } } - p.setGroup(&RuleGroup{ + p.SetGroup(&RuleGroup{ ID: group.ID, Index: group.Index, Override: group.Override, }) for _, r := range group.Rules { - if err := m.adjustRule(r, group.ID); err != nil { + if err := m.AdjustRule(r, group.ID); err != nil { return err } - p.setRule(r) + p.SetRule(r) } - if err := m.tryCommitPatch(p); err != nil { + if err := m.TryCommitPatch(p); err != nil { return err } log.Info("group is reset", zap.String("group", fmt.Sprint(group))) @@ -765,18 +783,18 @@ func (m *RuleManager) DeleteGroupBundle(id string, regex bool) error { matchID = r.MatchString } - p := m.beginPatch() + p := m.BeginPatch() for k := range m.ruleConfig.rules { if matchID(k[0]) { - p.deleteRule(k[0], k[1]) + p.DeleteRule(k[0], k[1]) } } for _, g := range m.ruleConfig.groups { if matchID(g.ID) { - p.deleteGroup(g.ID) + p.DeleteGroup(g.ID) } } - if err := m.tryCommitPatch(p); err != nil { + if err := m.TryCommitPatch(p); err != nil { return err } log.Info("groups are removed", zap.String("id", id), zap.Bool("regexp", regex)) diff --git a/pkg/schedule/placement/rule_manager_test.go b/pkg/schedule/placement/rule_manager_test.go index 0539e935113..5494b3c5a9d 100644 --- a/pkg/schedule/placement/rule_manager_test.go +++ b/pkg/schedule/placement/rule_manager_test.go @@ -91,23 +91,23 @@ func TestAdjustRule(t *testing.T) { {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: Voter, Count: -1}, {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: Voter, Count: 3, LabelConstraints: []LabelConstraint{{Op: "foo"}}}, } - re.NoError(manager.adjustRule(&rules[0], "group")) + re.NoError(manager.AdjustRule(&rules[0], "group")) re.Equal([]byte{0x12, 0x3a, 0xbc}, rules[0].StartKey) re.Equal([]byte{0x12, 0x3a, 0xbf}, rules[0].EndKey) - re.Error(manager.adjustRule(&rules[1], "")) + re.Error(manager.AdjustRule(&rules[1], "")) for i := 2; i < len(rules); i++ { - re.Error(manager.adjustRule(&rules[i], "group")) + re.Error(manager.AdjustRule(&rules[i], "group")) } manager.SetKeyType(constant.Table.String()) - re.Error(manager.adjustRule(&Rule{GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: Voter, Count: 3}, "group")) + re.Error(manager.AdjustRule(&Rule{GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: Voter, Count: 3}, "group")) manager.SetKeyType(constant.Txn.String()) - re.Error(manager.adjustRule(&Rule{GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: Voter, Count: 3}, "group")) + re.Error(manager.AdjustRule(&Rule{GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: Voter, Count: 3}, "group")) - re.Error(manager.adjustRule(&Rule{ + re.Error(manager.AdjustRule(&Rule{ GroupID: "group", ID: "id", StartKeyHex: hex.EncodeToString(codec.EncodeBytes([]byte{0})), @@ -116,7 +116,7 @@ func TestAdjustRule(t *testing.T) { Count: 3, }, "group")) - re.Error(manager.adjustRule(&Rule{ + re.Error(manager.AdjustRule(&Rule{ GroupID: "tiflash", ID: "id", StartKeyHex: hex.EncodeToString(codec.EncodeBytes([]byte{0})), diff --git a/pkg/storage/endpoint/rule.go b/pkg/storage/endpoint/rule.go index 80b6fc7c0ff..b18360040ea 100644 --- a/pkg/storage/endpoint/rule.go +++ b/pkg/storage/endpoint/rule.go @@ -14,58 +14,54 @@ package endpoint +import ( + "context" + + "github.com/tikv/pd/pkg/storage/kv" +) + // RuleStorage defines the storage operations on the rule. type RuleStorage interface { + LoadRules(txn kv.Txn, f func(k, v string)) error + SaveRule(txn kv.Txn, ruleKey string, rule interface{}) error + DeleteRule(txn kv.Txn, ruleKey string) error + LoadRuleGroups(txn kv.Txn, f func(k, v string)) error + SaveRuleGroup(txn kv.Txn, groupID string, group interface{}) error + DeleteRuleGroup(txn kv.Txn, groupID string) error + // LoadRule is used only in rule watcher. LoadRule(ruleKey string) (string, error) - LoadRules(f func(k, v string)) error - SaveRule(ruleKey string, rule interface{}) error - SaveRuleJSON(ruleKey, rule string) error - DeleteRule(ruleKey string) error - LoadRuleGroups(f func(k, v string)) error - SaveRuleGroup(groupID string, group interface{}) error - SaveRuleGroupJSON(groupID, group string) error - DeleteRuleGroup(groupID string) error + LoadRegionRules(f func(k, v string)) error SaveRegionRule(ruleKey string, rule interface{}) error - SaveRegionRuleJSON(ruleKey, rule string) error DeleteRegionRule(ruleKey string) error + RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error } var _ RuleStorage = (*StorageEndpoint)(nil) // SaveRule stores a rule cfg to the rulesPath. -func (se *StorageEndpoint) SaveRule(ruleKey string, rule interface{}) error { - return se.saveJSON(ruleKeyPath(ruleKey), rule) -} - -// SaveRuleJSON stores a rule cfg JSON to the rulesPath. -func (se *StorageEndpoint) SaveRuleJSON(ruleKey, rule string) error { - return se.Save(ruleKeyPath(ruleKey), rule) +func (se *StorageEndpoint) SaveRule(txn kv.Txn, ruleKey string, rule interface{}) error { + return saveJSONInTxn(txn, ruleKeyPath(ruleKey), rule) } // DeleteRule removes a rule from storage. -func (se *StorageEndpoint) DeleteRule(ruleKey string) error { - return se.Remove(ruleKeyPath(ruleKey)) +func (se *StorageEndpoint) DeleteRule(txn kv.Txn, ruleKey string) error { + return txn.Remove(ruleKeyPath(ruleKey)) } // LoadRuleGroups loads all rule groups from storage. -func (se *StorageEndpoint) LoadRuleGroups(f func(k, v string)) error { - return se.loadRangeByPrefix(ruleGroupPath+"/", f) +func (se *StorageEndpoint) LoadRuleGroups(txn kv.Txn, f func(k, v string)) error { + return loadRangeByPrefixInTxn(txn, ruleGroupPath+"/", f) } // SaveRuleGroup stores a rule group config to storage. -func (se *StorageEndpoint) SaveRuleGroup(groupID string, group interface{}) error { - return se.saveJSON(ruleGroupIDPath(groupID), group) -} - -// SaveRuleGroupJSON stores a rule group config JSON to storage. -func (se *StorageEndpoint) SaveRuleGroupJSON(groupID, group string) error { - return se.Save(ruleGroupIDPath(groupID), group) +func (se *StorageEndpoint) SaveRuleGroup(txn kv.Txn, groupID string, group interface{}) error { + return saveJSONInTxn(txn, ruleGroupIDPath(groupID), group) } // DeleteRuleGroup removes a rule group from storage. -func (se *StorageEndpoint) DeleteRuleGroup(groupID string) error { - return se.Remove(ruleGroupIDPath(groupID)) +func (se *StorageEndpoint) DeleteRuleGroup(txn kv.Txn, groupID string) error { + return txn.Remove(ruleGroupIDPath(groupID)) } // LoadRegionRules loads region rules from storage. @@ -78,11 +74,6 @@ func (se *StorageEndpoint) SaveRegionRule(ruleKey string, rule interface{}) erro return se.saveJSON(regionLabelKeyPath(ruleKey), rule) } -// SaveRegionRuleJSON saves a region rule JSON to the storage. -func (se *StorageEndpoint) SaveRegionRuleJSON(ruleKey, rule string) error { - return se.Save(regionLabelKeyPath(ruleKey), rule) -} - // DeleteRegionRule removes a region rule from storage. func (se *StorageEndpoint) DeleteRegionRule(ruleKey string) error { return se.Remove(regionLabelKeyPath(ruleKey)) @@ -94,6 +85,6 @@ func (se *StorageEndpoint) LoadRule(ruleKey string) (string, error) { } // LoadRules loads placement rules from storage. -func (se *StorageEndpoint) LoadRules(f func(k, v string)) error { - return se.loadRangeByPrefix(rulesPath+"/", f) +func (se *StorageEndpoint) LoadRules(txn kv.Txn, f func(k, v string)) error { + return loadRangeByPrefixInTxn(txn, rulesPath+"/", f) } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 0e69986f255..c48c066a2aa 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -559,7 +559,7 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { kgm.deleteKeyspaceGroup(groupID) return nil } - postEventFn := func([]*clientv3.Event) error { + postEventsFn := func([]*clientv3.Event) error { // Retry the groups that are not initialized successfully before. for id, group := range kgm.groupUpdateRetryList { delete(kgm.groupUpdateRetryList, id) @@ -576,7 +576,7 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { func([]*clientv3.Event) error { return nil }, putFn, deleteFn, - postEventFn, + postEventsFn, clientv3.WithRange(endKey), ) if kgm.loadKeyspaceGroupsTimeout > 0 { diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 0e1b2731474..f6beafee511 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -865,6 +865,16 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) if limit != 0 { limit++ } + if err := lw.preEventsFn([]*clientv3.Event{}); err != nil { + log.Error("run pre event failed in watch loop", zap.String("name", lw.name), + zap.String("key", lw.key), zap.Error(err)) + } + defer func() { + if err := lw.postEventsFn([]*clientv3.Event{}); err != nil { + log.Error("run post event failed in watch loop", zap.String("name", lw.name), + zap.String("key", lw.key), zap.Error(err)) + } + }() for { // Sort by key to get the next key and we don't need to worry about the performance, // Because the default sort is just SortByKey and SortAscend @@ -875,10 +885,6 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) zap.String("key", lw.key), zap.Error(err)) return 0, err } - if err := lw.preEventsFn([]*clientv3.Event{}); err != nil { - log.Error("run pre event failed in watch loop", zap.String("name", lw.name), - zap.String("key", lw.key), zap.Error(err)) - } for i, item := range resp.Kvs { if resp.More && i == len(resp.Kvs)-1 { // The last key is the start key of the next batch. @@ -888,15 +894,15 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) } err = lw.putFn(item) if err != nil { - log.Error("put failed in watch loop when loading", zap.String("name", lw.name), zap.String("key", lw.key), zap.Error(err)) + log.Error("put failed in watch loop when loading", zap.String("name", lw.name), zap.String("watch-key", lw.key), + zap.ByteString("key", item.Key), zap.ByteString("value", item.Value), zap.Error(err)) + } else { + log.Debug("put successfully in watch loop when loading", zap.String("name", lw.name), zap.String("watch-key", lw.key), + zap.ByteString("key", item.Key), zap.ByteString("value", item.Value)) } } // Note: if there are no keys in etcd, the resp.More is false. It also means the load is finished. if !resp.More { - if err := lw.postEventsFn([]*clientv3.Event{}); err != nil { - log.Error("run post event failed in watch loop", zap.String("name", lw.name), - zap.String("key", lw.key), zap.Error(err)) - } return resp.Header.Revision + 1, err } } diff --git a/server/keyspace_service.go b/server/keyspace_service.go index 1718108d73b..11d912a5f54 100644 --- a/server/keyspace_service.go +++ b/server/keyspace_service.go @@ -89,7 +89,10 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques deleteFn := func(kv *mvccpb.KeyValue) error { return nil } - postEventFn := func([]*clientv3.Event) error { + postEventsFn := func([]*clientv3.Event) error { + if len(keyspaces) == 0 { + return nil + } defer func() { keyspaces = keyspaces[:0] }() @@ -112,7 +115,7 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques func([]*clientv3.Event) error { return nil }, putFn, deleteFn, - postEventFn, + postEventsFn, clientv3.WithRange(clientv3.GetPrefixRangeEnd(startKey)), ) watcher.StartWatchLoop() diff --git a/tests/server/api/region_test.go b/tests/server/api/region_test.go index 450995a6e5e..328c0fcd885 100644 --- a/tests/server/api/region_test.go +++ b/tests/server/api/region_test.go @@ -248,8 +248,7 @@ func (suite *regionTestSuite) checkScatterRegions(cluster *tests.TestCluster) { } func (suite *regionTestSuite) TestCheckRegionsReplicated() { - // Fixme: after delete+set rule, the key range will be empty, so the test will fail in api mode. - suite.env.RunTestInPDMode(suite.checkRegionsReplicated) + suite.env.RunTestInTwoModes(suite.checkRegionsReplicated) } func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) { @@ -304,6 +303,14 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) suite.NoError(err) + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + return len(respBundle) == 1 && respBundle[0].ID == "5" + }) + tu.Eventually(re, func() bool { err = tu.ReadGetJSON(re, testDialClient, url, &status) suite.NoError(err) @@ -328,9 +335,19 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) suite.NoError(err) - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("REPLICATED", status) + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + return len(respBundle) == 1 && len(respBundle[0].Rules) == 2 + }) + + tu.Eventually(re, func() bool { + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + return status == "REPLICATED" + }) // test multiple bundles bundle = append(bundle, placement.GroupBundle{ @@ -347,17 +364,34 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) suite.NoError(err) - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("INPROGRESS", status) + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + if len(respBundle) != 2 { + return false + } + s1 := respBundle[0].ID == "5" && respBundle[1].ID == "6" + s2 := respBundle[0].ID == "6" && respBundle[1].ID == "5" + return s1 || s2 + }) + + tu.Eventually(re, func() bool { + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + return status == "INPROGRESS" + }) r1 = core.NewTestRegionInfo(2, 1, []byte("a"), []byte("b")) r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 1}, &metapb.Peer{Id: 6, StoreId: 1}, &metapb.Peer{Id: 7, StoreId: 1}) tests.MustPutRegionInfo(re, cluster, r1) - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("REPLICATED", status) + tu.Eventually(re, func() bool { + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + return status == "REPLICATED" + }) } func (suite *regionTestSuite) checkRegionCount(cluster *tests.TestCluster, count uint64) { diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index eaa41cc11bc..af70d5afed9 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -20,6 +20,9 @@ import ( "fmt" "net/http" "net/url" + "sort" + "strconv" + "sync" "testing" "github.com/pingcap/kvproto/pkg/metapb" @@ -28,6 +31,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/utils/syncutil" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" @@ -777,7 +781,7 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err := tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 1) - suite.compareBundle(bundles[0], b1) + suite.assertBundleEqual(bundles[0], b1) // Set b2 := placement.GroupBundle{ @@ -797,14 +801,14 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { var bundle placement.GroupBundle err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule/foo", &bundle) suite.NoError(err) - suite.compareBundle(bundle, b2) + suite.assertBundleEqual(bundle, b2) // GetAll again err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 2) - suite.compareBundle(bundles[0], b1) - suite.compareBundle(bundles[1], b2) + suite.assertBundleEqual(bundles[0], b1) + suite.assertBundleEqual(bundles[1], b2) // Delete err = tu.CheckDelete(testDialClient, urlPrefix+"/placement-rule/pd", tu.StatusOK(suite.Require())) @@ -814,7 +818,7 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 1) - suite.compareBundle(bundles[0], b2) + suite.assertBundleEqual(bundles[0], b2) // SetAll b2.Rules = append(b2.Rules, &placement.Rule{GroupID: "foo", ID: "baz", Index: 2, Role: placement.Follower, Count: 1}) @@ -829,9 +833,9 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 3) - suite.compareBundle(bundles[0], b2) - suite.compareBundle(bundles[1], b1) - suite.compareBundle(bundles[2], b3) + suite.assertBundleEqual(bundles[0], b2) + suite.assertBundleEqual(bundles[1], b1) + suite.assertBundleEqual(bundles[2], b3) // Delete using regexp err = tu.CheckDelete(testDialClient, urlPrefix+"/placement-rule/"+url.PathEscape("foo.*")+"?regexp", tu.StatusOK(suite.Require())) @@ -841,7 +845,7 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 1) - suite.compareBundle(bundles[0], b1) + suite.assertBundleEqual(bundles[0], b1) // Set id := "rule-without-group-id" @@ -862,14 +866,14 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { // Get err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule/"+id, &bundle) suite.NoError(err) - suite.compareBundle(bundle, b4) + suite.assertBundleEqual(bundle, b4) // GetAll again err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 2) - suite.compareBundle(bundles[0], b1) - suite.compareBundle(bundles[1], b4) + suite.assertBundleEqual(bundles[0], b1) + suite.assertBundleEqual(bundles[1], b4) // SetAll b5 := placement.GroupBundle{ @@ -890,9 +894,9 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 3) - suite.compareBundle(bundles[0], b1) - suite.compareBundle(bundles[1], b4) - suite.compareBundle(bundles[2], b5) + suite.assertBundleEqual(bundles[0], b1) + suite.assertBundleEqual(bundles[1], b4) + suite.assertBundleEqual(bundles[2], b5) } func (suite *ruleTestSuite) TestBundleBadRequest() { @@ -925,20 +929,315 @@ func (suite *ruleTestSuite) checkBundleBadRequest(cluster *tests.TestCluster) { } } -func (suite *ruleTestSuite) compareBundle(b1, b2 placement.GroupBundle) { - tu.Eventually(suite.Require(), func() bool { - if b2.ID != b1.ID || b2.Index != b1.Index || b2.Override != b1.Override || len(b2.Rules) != len(b1.Rules) { +func (suite *ruleTestSuite) TestLeaderAndVoter() { + suite.env.RunTestInTwoModes(suite.checkLeaderAndVoter) +} + +func (suite *ruleTestSuite) checkLeaderAndVoter(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() + urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) + + stores := []*metapb.Store{ + { + Id: 1, + Address: "tikv1", + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + Version: "7.5.0", + Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}}, + }, + { + Id: 2, + Address: "tikv2", + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + Version: "7.5.0", + Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z2"}}, + }, + } + + for _, store := range stores { + tests.MustPutStore(re, cluster, store) + } + + bundles := [][]placement.GroupBundle{ + { + { + ID: "1", + Index: 1, + Rules: []*placement.Rule{ + { + ID: "rule_1", Index: 1, Role: placement.Voter, Count: 1, GroupID: "1", + LabelConstraints: []placement.LabelConstraint{ + {Key: "zone", Op: "in", Values: []string{"z1"}}, + }, + }, + { + ID: "rule_2", Index: 2, Role: placement.Leader, Count: 1, GroupID: "1", + LabelConstraints: []placement.LabelConstraint{ + {Key: "zone", Op: "in", Values: []string{"z2"}}, + }, + }, + }, + }, + }, + { + { + ID: "1", + Index: 1, + Rules: []*placement.Rule{ + { + ID: "rule_1", Index: 1, Role: placement.Leader, Count: 1, GroupID: "1", + LabelConstraints: []placement.LabelConstraint{ + {Key: "zone", Op: "in", Values: []string{"z2"}}, + }, + }, + { + ID: "rule_2", Index: 2, Role: placement.Voter, Count: 1, GroupID: "1", + LabelConstraints: []placement.LabelConstraint{ + {Key: "zone", Op: "in", Values: []string{"z1"}}, + }, + }, + }, + }, + }} + for _, bundle := range bundles { + data, err := json.Marshal(bundle) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) + suite.NoError(err) + + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err := tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + suite.Len(respBundle, 1) + if bundle[0].Rules[0].Role == placement.Leader { + return respBundle[0].Rules[0].Role == placement.Leader + } + if bundle[0].Rules[0].Role == placement.Voter { + return respBundle[0].Rules[0].Role == placement.Voter + } return false - } - for i := range b1.Rules { - if !suite.compareRule(b1.Rules[i], b2.Rules[i]) { + }) + } +} + +func (suite *ruleTestSuite) TestDeleteAndUpdate() { + suite.env.RunTestInTwoModes(suite.checkDeleteAndUpdate) +} + +func (suite *ruleTestSuite) checkDeleteAndUpdate(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() + urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) + + bundles := [][]placement.GroupBundle{ + // 1 rule group with 1 rule + {{ + ID: "1", + Index: 1, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 1, Role: placement.Voter, Count: 1, GroupID: "1", + }, + }, + }}, + // 2 rule groups with different range rules + {{ + ID: "1", + Index: 1, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 1, Role: placement.Voter, Count: 1, GroupID: "1", + StartKey: []byte("a"), EndKey: []byte("b"), + }, + }, + }, { + ID: "2", + Index: 2, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 2, Role: placement.Voter, Count: 1, GroupID: "2", + StartKey: []byte("b"), EndKey: []byte("c"), + }, + }, + }}, + // 2 rule groups with 1 rule and 2 rules + {{ + ID: "3", + Index: 3, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 3, Role: placement.Voter, Count: 1, GroupID: "3", + }, + }, + }, { + ID: "4", + Index: 4, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 4, Role: placement.Voter, Count: 1, GroupID: "4", + }, + { + ID: "bar", Index: 6, Role: placement.Voter, Count: 1, GroupID: "4", + }, + }, + }}, + // 1 rule group with 2 rules + {{ + ID: "5", + Index: 5, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 5, Role: placement.Voter, Count: 1, GroupID: "5", + }, + { + ID: "bar", Index: 6, Role: placement.Voter, Count: 1, GroupID: "5", + }, + }, + }}, + } + + for _, bundle := range bundles { + data, err := json.Marshal(bundle) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) + suite.NoError(err) + + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + if len(respBundle) != len(bundle) { return false } - } - return true + sort.Slice(respBundle, func(i, j int) bool { return respBundle[i].ID < respBundle[j].ID }) + sort.Slice(bundle, func(i, j int) bool { return bundle[i].ID < bundle[j].ID }) + for i := range respBundle { + if !suite.compareBundle(respBundle[i], bundle[i]) { + return false + } + } + return true + }) + } +} + +func (suite *ruleTestSuite) TestConcurrency() { + suite.env.RunTestInTwoModes(suite.checkConcurrency) +} + +func (suite *ruleTestSuite) checkConcurrency(cluster *tests.TestCluster) { + // test concurrency of set rule group with different group id + suite.checkConcurrencyWith(cluster, + func(i int) []placement.GroupBundle { + return []placement.GroupBundle{ + { + ID: strconv.Itoa(i), + Index: i, + Rules: []*placement.Rule{ + { + ID: "foo", Index: i, Role: placement.Voter, Count: 1, GroupID: strconv.Itoa(i), + }, + }, + }, + } + }, + func(resp []placement.GroupBundle, i int) bool { + return len(resp) == 1 && resp[0].ID == strconv.Itoa(i) + }, + ) + // test concurrency of set rule with different id + suite.checkConcurrencyWith(cluster, + func(i int) []placement.GroupBundle { + return []placement.GroupBundle{ + { + ID: "pd", + Index: 1, + Rules: []*placement.Rule{ + { + ID: strconv.Itoa(i), Index: i, Role: placement.Voter, Count: 1, GroupID: "pd", + }, + }, + }, + } + }, + func(resp []placement.GroupBundle, i int) bool { + return len(resp) == 1 && resp[0].ID == "pd" && resp[0].Rules[0].ID == strconv.Itoa(i) + }, + ) +} + +func (suite *ruleTestSuite) checkConcurrencyWith(cluster *tests.TestCluster, + genBundle func(int) []placement.GroupBundle, + checkBundle func([]placement.GroupBundle, int) bool) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() + urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) + expectResult := struct { + syncutil.RWMutex + val int + }{} + wg := sync.WaitGroup{} + + for i := 1; i <= 10; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + bundle := genBundle(i) + data, err := json.Marshal(bundle) + suite.NoError(err) + for j := 0; j < 10; j++ { + expectResult.Lock() + err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) + suite.NoError(err) + expectResult.val = i + expectResult.Unlock() + } + }(i) + } + + wg.Wait() + expectResult.RLock() + defer expectResult.RUnlock() + suite.NotZero(expectResult.val) + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err := tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + suite.Len(respBundle, 1) + return checkBundle(respBundle, expectResult.val) + }) +} + +func (suite *ruleTestSuite) assertBundleEqual(b1, b2 placement.GroupBundle) { + tu.Eventually(suite.Require(), func() bool { + return suite.compareBundle(b1, b2) }) } +func (suite *ruleTestSuite) compareBundle(b1, b2 placement.GroupBundle) bool { + if b2.ID != b1.ID || b2.Index != b1.Index || b2.Override != b1.Override || len(b2.Rules) != len(b1.Rules) { + return false + } + sort.Slice(b1.Rules, func(i, j int) bool { return b1.Rules[i].ID < b1.Rules[j].ID }) + sort.Slice(b2.Rules, func(i, j int) bool { return b2.Rules[i].ID < b2.Rules[j].ID }) + for i := range b1.Rules { + if !suite.compareRule(b1.Rules[i], b2.Rules[i]) { + return false + } + } + return true +} + func (suite *ruleTestSuite) compareRule(r1 *placement.Rule, r2 *placement.Rule) bool { return r2.GroupID == r1.GroupID && r2.ID == r1.ID &&