Skip to content

Commit

Permalink
rule: prevent misconfiguration of placement rules in some situations (t…
Browse files Browse the repository at this point in the history
…ikv#2481) (tikv#2516)

Signed-off-by: disksing <[email protected]>
  • Loading branch information
sre-bot authored Jun 5, 2020
1 parent bbf1354 commit e88c246
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 11 deletions.
2 changes: 1 addition & 1 deletion server/api/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (h *ruleHandler) checkRule(r *placement.Rule) error {
if err != nil {
return errors.Wrap(err, "end key is not hex format")
}
if len(start) > 0 && bytes.Compare(end, start) <= 0 {
if len(end) > 0 && bytes.Compare(end, start) <= 0 {
return errors.New("endKey should be greater than startKey")
}

Expand Down
19 changes: 16 additions & 3 deletions server/schedule/placement/rule_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ package placement

import (
"bytes"
"encoding/hex"
"sort"
"strings"

"github.com/pkg/errors"
)

type splitPointType int
Expand Down Expand Up @@ -68,9 +72,9 @@ type ruleList struct {
ranges []rangeRules // ranges[i] contains rules apply to (ranges[i].startKey, ranges[i+1].startKey).
}

func buildRuleList(rules map[[2]string]*Rule) ruleList {
func buildRuleList(rules map[[2]string]*Rule) (ruleList, error) {
if len(rules) == 0 {
return ruleList{}
return ruleList{}, errors.New("no rule left")
}
// collect and sort split points.
var points []splitPoint
Expand Down Expand Up @@ -105,6 +109,15 @@ func buildRuleList(rules map[[2]string]*Rule) ruleList {
if i == len(points)-1 || !bytes.Equal(p.key, points[i+1].key) {
// next key is different, push sr to rl.
rr := sr.rules
if len(rr) == 0 {
var endKey []byte
if i != len(points)-1 {
endKey = points[i+1].key
}
return ruleList{}, errors.Errorf("no rule for range {%s, %s}",
strings.ToUpper(hex.EncodeToString(p.key)),
strings.ToUpper(hex.EncodeToString(endKey)))
}
if i != len(points)-1 {
rr = append(rr[:0:0], rr...) // clone
}
Expand All @@ -115,7 +128,7 @@ func buildRuleList(rules map[[2]string]*Rule) ruleList {
})
}
}
return rl
return rl, nil
}

func (rl ruleList) getSplitKeys(start, end []byte) [][]byte {
Expand Down
27 changes: 22 additions & 5 deletions server/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string) error
}
m.rules[defaultRule.Key()] = defaultRule
}
m.ruleList = buildRuleList(m.rules)
ruleList, err := buildRuleList(m.rules)
if err != nil {
return err
}
m.ruleList = ruleList
m.initialized = true
return nil
}
Expand Down Expand Up @@ -169,8 +173,13 @@ func (m *RuleManager) SetRule(rule *Rule) error {
old := m.rules[rule.Key()]
m.rules[rule.Key()] = rule

if err = m.store.SaveRule(rule.StoreKey(), rule); err != nil {
// restore
ruleList, err := buildRuleList(m.rules)
if err == nil {
err = m.store.SaveRule(rule.StoreKey(), rule)
}

// restore
if err != nil {
if old == nil {
delete(m.rules, rule.Key())
} else {
Expand All @@ -179,8 +188,8 @@ func (m *RuleManager) SetRule(rule *Rule) error {
return err
}

m.ruleList = ruleList
log.Info("placement rule updated", zap.Stringer("rule", rule))
m.ruleList = buildRuleList(m.rules)
return nil
}

Expand All @@ -194,13 +203,21 @@ func (m *RuleManager) DeleteRule(group, id string) error {
return nil
}
delete(m.rules, [2]string{group, id})

ruleList, err := buildRuleList(m.rules)
if err != nil {
// restore
m.rules[key] = old
return err
}

if err := m.store.DeleteRule(old.StoreKey()); err != nil {
// restore
m.rules[key] = old
return err
}
m.ruleList = ruleList
log.Info("placement rule removed", zap.Stringer("rule", old))
m.ruleList = buildRuleList(m.rules)
return nil
}

Expand Down
25 changes: 24 additions & 1 deletion server/schedule/placement/rule_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func (s *testManagerSuite) TestSaveLoad(c *C) {
}

func (s *testManagerSuite) TestKeys(c *C) {
s.manager.DeleteRule("pd", "default")
rules := []*Rule{
{GroupID: "1", ID: "1", Role: "voter", Count: 1, StartKeyHex: "", EndKeyHex: ""},
{GroupID: "2", ID: "2", Role: "voter", Count: 1, StartKeyHex: "11", EndKeyHex: "ff"},
Expand All @@ -100,6 +99,7 @@ func (s *testManagerSuite) TestKeys(c *C) {
for _, r := range rules {
s.manager.SetRule(r)
}
s.manager.DeleteRule("pd", "default")

splitKeys := [][]string{
{"", "", "11", "22", "44", "dd", "ee", "ff"},
Expand Down Expand Up @@ -161,6 +161,29 @@ func (s *testManagerSuite) TestKeys(c *C) {
}
}

func (s *testManagerSuite) TestRangeGap(c *C) {
// |-- default --|
// cannot delete the last rule
err := s.manager.DeleteRule("pd", "default")
c.Assert(err, NotNil)

err = s.manager.SetRule(&Rule{GroupID: "pd", ID: "foo", StartKeyHex: "", EndKeyHex: "abcd", Role: "voter", Count: 1})
c.Assert(err, IsNil)
// |-- default --|
// |-- foo --|
// still cannot delete default since it will cause ("abcd", "") has no rules inside.
err = s.manager.DeleteRule("pd", "default")
c.Assert(err, NotNil)
err = s.manager.SetRule(&Rule{GroupID: "pd", ID: "bar", StartKeyHex: "abcd", EndKeyHex: "", Role: "voter", Count: 1})
c.Assert(err, IsNil)
// now default can be deleted.
err = s.manager.DeleteRule("pd", "default")
c.Assert(err, IsNil)
// cannot change range since it will cause ("abaa", "abcd") has no rules inside.
err = s.manager.SetRule(&Rule{GroupID: "pd", ID: "foo", StartKeyHex: "", EndKeyHex: "abaa", Role: "voter", Count: 1})
c.Assert(err, NotNil)
}

func (s *testManagerSuite) dhex(hk string) []byte {
k, err := hex.DecodeString(hk)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion tools/pd-ctl/pdctl/command/config_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,10 @@ func putPlacementRulesFunc(cmd *cobra.Command, args []string) {
return
}
fmt.Printf("saved rule %s/%s\n", r.GroupID, r.ID)
} else {
}
}
for _, r := range rules {
if r.Count == 0 {
_, err = doRequest(cmd, path.Join(rulePrefix, r.GroupID, r.ID), http.MethodDelete)
if err != nil {
fmt.Printf("failed to delete rule %s/%s: %v\n", r.GroupID, r.ID, err)
Expand Down

0 comments on commit e88c246

Please sign in to comment.