Skip to content

Commit

Permalink
scheduling/watcher, storage: integrate rule watcher with the managers (
Browse files Browse the repository at this point in the history
…tikv#7213)

ref tikv#5839

Integrate rule watcher with the managers.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
JmPotato and ti-chi-bot[bot] authored Oct 17, 2023
1 parent c9a97a8 commit a85f29c
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 104 deletions.
83 changes: 62 additions & 21 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"sync"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/schedule/checker"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
Expand Down Expand Up @@ -49,18 +52,27 @@ type Watcher struct {
etcdClient *clientv3.Client
ruleStorage endpoint.RuleStorage

// checkerController is used to add the suspect key ranges to the checker when the rule changed.
checkerController *checker.Controller
// ruleManager is used to manage the placement rules.
ruleManager *placement.RuleManager
// regionLabeler is used to manage the region label rules.
regionLabeler *labeler.RegionLabeler

ruleWatcher *etcdutil.LoopWatcher
groupWatcher *etcdutil.LoopWatcher
labelWatcher *etcdutil.LoopWatcher
}

// NewWatcher creates a new watcher to watch the Placement Rule change from PD API server.
// Please use `GetRuleStorage` to get the underlying storage to access the Placement Rules.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
clusterID uint64,
ruleStorage endpoint.RuleStorage,
checkerController *checker.Controller,
ruleManager *placement.RuleManager,
regionLabeler *labeler.RegionLabeler,
) (*Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
rw := &Watcher{
Expand All @@ -71,6 +83,9 @@ func NewWatcher(
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
etcdClient: etcdClient,
ruleStorage: ruleStorage,
checkerController: checkerController,
ruleManager: ruleManager,
regionLabeler: regionLabeler,
}
err := rw.initializeRuleWatcher()
if err != nil {
Expand All @@ -90,17 +105,31 @@ func NewWatcher(
func (rw *Watcher) initializeRuleWatcher() error {
prefixToTrim := rw.rulesPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
// Since the PD API server will validate the rule before saving it to etcd,
// so we could directly save the string rule in JSON to the storage here.
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStorage.SaveRuleJSON(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
rule, err := placement.NewRuleFromJSON(kv.Value)
if err != nil {
return err
}
// Update the suspect key ranges in the checker.
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil {
rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey)
}
return rw.ruleManager.SetRule(rule)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete placement rule", zap.String("key", string(kv.Key)))
return rw.ruleStorage.DeleteRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
key := string(kv.Key)
log.Info("delete placement rule", zap.String("key", key))
ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim))
if err != nil {
return err
}
rule, err := placement.NewRuleFromJSON([]byte(ruleJSON))
if err != nil {
return err
}
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID)
}
postEventFn := func() error {
return nil
Expand All @@ -120,14 +149,24 @@ func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStorage.SaveRuleGroupJSON(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value)
if err != nil {
return err
}
// Add all rule key ranges within the group to the suspect key ranges.
for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.SetRuleGroup(ruleGroup)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete placement rule group", zap.String("key", string(kv.Key)))
return rw.ruleStorage.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), prefixToTrim))
key := string(kv.Key)
log.Info("delete placement rule group", zap.String("key", key))
trimmedKey := strings.TrimPrefix(key, prefixToTrim)
for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.DeleteRuleGroup(trimmedKey)
}
postEventFn := func() error {
return nil
Expand All @@ -147,14 +186,16 @@ func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStorage.SaveRegionRuleJSON(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
rule, err := labeler.NewLabelRuleFromJSON(kv.Value)
if err != nil {
return err
}
return rw.regionLabeler.SetLabelRule(rule)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete region label rule", zap.String("key", string(kv.Key)))
return rw.ruleStorage.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
key := string(kv.Key)
log.Info("delete region label rule", zap.String("key", key))
return rw.regionLabeler.DeleteLabelRule(strings.TrimPrefix(key, prefixToTrim))
}
postEventFn := func() error {
return nil
Expand Down
17 changes: 14 additions & 3 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func (s *Server) startServer() (err error) {
func (s *Server) startCluster(context.Context) error {
s.basicCluster = core.NewBasicCluster()
s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
err := s.startWatcher()
err := s.startMetaConfWatcher()
if err != nil {
return err
}
Expand All @@ -464,7 +464,13 @@ func (s *Server) startCluster(context.Context) error {
if err != nil {
return err
}
// Inject the cluster components into the config watcher after the scheduler controller is created.
s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController())
// Start the rule watcher after the cluster is created.
err = s.startRuleWatcher()
if err != nil {
return err
}
s.cluster.StartBackgroundJobs()
return nil
}
Expand All @@ -474,7 +480,7 @@ func (s *Server) stopCluster() {
s.stopWatcher()
}

func (s *Server) startWatcher() (err error) {
func (s *Server) startMetaConfWatcher() (err error) {
s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster)
if err != nil {
return err
Expand All @@ -483,7 +489,12 @@ func (s *Server) startWatcher() (err error) {
if err != nil {
return err
}
s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.storage)
return err
}

func (s *Server) startRuleWatcher() (err error) {
s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.storage,
s.cluster.GetCoordinator().GetCheckerController(), s.cluster.GetRuleManager(), s.cluster.GetRegionLabeler())
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/labeler/labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (l *RegionLabeler) Patch(patch LabelRulePatch) error {
}
}

// update inmemory states.
// update in-memory states.
l.Lock()
defer l.Unlock()

Expand Down
6 changes: 4 additions & 2 deletions pkg/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolat
}

func (m *RuleManager) loadRules() error {
var toSave []*Rule
var toDelete []string
var (
toSave []*Rule
toDelete []string
)
err := m.storage.LoadRules(func(k, v string) {
r, err := NewRuleFromJSON([]byte(v))
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/endpoint/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

// RuleStorage defines the storage operations on the rule.
type RuleStorage interface {
LoadRule(ruleKey string) (string, error)
LoadRules(f func(k, v string)) error
SaveRule(ruleKey string, rule interface{}) error
SaveRuleJSON(ruleKey, rule string) error
Expand Down Expand Up @@ -93,6 +94,11 @@ func (se *StorageEndpoint) DeleteRegionRule(ruleKey string) error {
return se.Remove(regionLabelKeyPath(ruleKey))
}

// LoadRule load a placement rule from storage.
func (se *StorageEndpoint) LoadRule(ruleKey string) (string, error) {
return se.Load(ruleKeyPath(ruleKey))
}

// LoadRules loads placement rules from storage.
func (se *StorageEndpoint) LoadRules(f func(k, v string)) error {
return se.loadRangeByPrefix(rulesPath+"/", f)
Expand Down
Loading

0 comments on commit a85f29c

Please sign in to comment.