Skip to content

Commit

Permalink
scheduling: watch the respective scheduler config and trim the prefix (
Browse files Browse the repository at this point in the history
…tikv#6955)

ref tikv#5839

- Make scheduling service watch the respective scheduler config.
- Trim the key prefix inside the watcher.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Dec 1, 2023
1 parent 5e876d4 commit 6902699
Show file tree
Hide file tree
Showing 10 changed files with 382 additions and 73 deletions.
2 changes: 1 addition & 1 deletion pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (manager *Manager) splitKeyspaceRegion(id uint32, waitRegionSplit bool) (er

start := time.Now()
skipRaw := manager.config.GetDisableRawKVRegionSplit()
keyspaceRule := makeLabelRule(id, skipRaw)
keyspaceRule := MakeLabelRule(id, skipRaw)
cl, ok := manager.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler })
if !ok {
return errors.New("cluster does not support region label")
Expand Down
4 changes: 2 additions & 2 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ func getRegionLabelID(id uint32) string {
return regionLabelIDPrefix + strconv.FormatUint(uint64(id), endpoint.SpaceIDBase)
}

// makeLabelRule makes the label rule for the given keyspace id.
func makeLabelRule(id uint32, skipRaw bool) *labeler.LabelRule {
// MakeLabelRule makes the label rule for the given keyspace id.
func MakeLabelRule(id uint32, skipRaw bool) *labeler.LabelRule {
return &labeler.LabelRule{
ID: getRegionLabelID(id),
Index: 0,
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,6 @@ func TestMakeLabelRule(t *testing.T) {
},
}
for _, testCase := range testCases {
re.Equal(testCase.expectedLabelRule, makeLabelRule(testCase.id, testCase.skipRaw))
re.Equal(testCase.expectedLabelRule, MakeLabelRule(testCase.id, testCase.skipRaw))
}
}
22 changes: 22 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
Expand Down Expand Up @@ -192,10 +193,13 @@ func (c *Config) validate() error {
// PersistConfig wraps all configurations that need to persist to storage and
// allows to access them safely.
type PersistConfig struct {
// Store the global configuration that is related to the scheduling.
clusterVersion unsafe.Pointer
schedule atomic.Value
replication atomic.Value
storeConfig atomic.Value
// Store the respective configurations for different schedulers.
schedulerConfig sync.Map
}

// NewPersistConfig creates a new PersistConfig instance.
Expand Down Expand Up @@ -253,6 +257,24 @@ func (o *PersistConfig) GetStoreConfig() *sc.StoreConfig {
return o.storeConfig.Load().(*sc.StoreConfig)
}

// SetSchedulerConfig sets the scheduler configuration with the given name.
func (o *PersistConfig) SetSchedulerConfig(name, data string) {
o.schedulerConfig.Store(name, data)
}

// RemoveSchedulerConfig removes the scheduler configuration with the given name.
func (o *PersistConfig) RemoveSchedulerConfig(name string) {
o.schedulerConfig.Delete(name)
}

// GetSchedulerConfig returns the scheduler configuration with the given name.
func (o *PersistConfig) GetSchedulerConfig(name string) string {
if v, ok := o.schedulerConfig.Load(name); ok {
return v.(string)
}
return ""
}

// GetMaxReplicas returns the max replicas.
func (o *PersistConfig) GetMaxReplicas() int {
return int(o.GetReplicationConfig().MaxReplicas)
Expand Down
90 changes: 66 additions & 24 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ package config
import (
"context"
"encoding/json"
"strings"
"sync"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/log"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
Expand All @@ -34,11 +36,20 @@ type Watcher struct {
ctx context.Context
cancel context.CancelFunc

etcdClient *clientv3.Client
watcher *etcdutil.LoopWatcher
// configPath is the path of the configuration in etcd:
// - Key: /pd/{cluster_id}/config
// - Value: configuration JSON.
configPath string
// schedulerConfigPathPrefix is the path prefix of the scheduler configuration in etcd:
// - Key: /pd/{cluster_id}/scheduler_config/{scheduler_name}
// - Value: configuration JSON.
schedulerConfigPathPrefix string

etcdClient *clientv3.Client
configWatcher *etcdutil.LoopWatcher
schedulerConfigWatcher *etcdutil.LoopWatcher

*PersistConfig
// TODO: watch the scheduler config change.
}

type persistedConfig struct {
Expand All @@ -52,19 +63,30 @@ type persistedConfig struct {
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
// configPath is the path of the configuration in etcd:
// - Key: /pd/{cluster_id}/config
// - Value: configuration JSON.
configPath string,
clusterID uint64,
persistConfig *PersistConfig,
) (*Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
cw := &Watcher{
ctx: ctx,
cancel: cancel,
etcdClient: etcdClient,
PersistConfig: persistConfig,
ctx: ctx,
cancel: cancel,
configPath: endpoint.ConfigPath(clusterID),
schedulerConfigPathPrefix: endpoint.SchedulerConfigPathPrefix(clusterID),
etcdClient: etcdClient,
PersistConfig: persistConfig,
}
err := cw.initializeConfigWatcher()
if err != nil {
return nil, err
}
err = cw.initializeSchedulerConfigWatcher()
if err != nil {
return nil, err
}
return cw, nil
}

func (cw *Watcher) initializeConfigWatcher() error {
putFn := func(kv *mvccpb.KeyValue) error {
cfg := &persistedConfig{}
if err := json.Unmarshal(kv.Value, cfg); err != nil {
Expand All @@ -84,21 +106,41 @@ func NewWatcher(
postEventFn := func() error {
return nil
}
cw.watcher = etcdutil.NewLoopWatcher(
ctx,
&cw.wg,
etcdClient,
"scheduling-config-watcher",
configPath,
putFn,
deleteFn,
postEventFn,
cw.configWatcher = etcdutil.NewLoopWatcher(
cw.ctx, &cw.wg,
cw.etcdClient,
"scheduling-config-watcher", cw.configPath,
putFn, deleteFn, postEventFn,
)
cw.watcher.StartWatchLoop()
if err := cw.watcher.WaitLoad(); err != nil {
return nil, err
cw.configWatcher.StartWatchLoop()
return cw.configWatcher.WaitLoad()
}

func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
cw.SetSchedulerConfig(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
return nil
}
return cw, nil
deleteFn := func(kv *mvccpb.KeyValue) error {
cw.RemoveSchedulerConfig(strings.TrimPrefix(string(kv.Key), prefixToTrim))
return nil
}
postEventFn := func() error {
return nil
}
cw.schedulerConfigWatcher = etcdutil.NewLoopWatcher(
cw.ctx, &cw.wg,
cw.etcdClient,
"scheduling-scheduler-config-watcher", cw.schedulerConfigPathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
cw.schedulerConfigWatcher.StartWatchLoop()
return cw.schedulerConfigWatcher.WaitLoad()
}

// Close closes the watcher.
Expand Down
80 changes: 50 additions & 30 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package rule

import (
"context"
"strings"
"sync"

"github.com/tikv/pd/pkg/storage/endpoint"
Expand Down Expand Up @@ -44,7 +45,7 @@ func (rs *ruleStorage) LoadRules(f func(k, v string)) error {
return nil
}

// SaveRule stores a rule cfg to the rulesPath.
// SaveRule stores a rule cfg to the rulesPathPrefix.
func (rs *ruleStorage) SaveRule(ruleKey string, rule interface{}) error {
rs.rules.Store(ruleKey, rule)
return nil
Expand Down Expand Up @@ -104,6 +105,19 @@ type Watcher struct {
cancel context.CancelFunc
wg sync.WaitGroup

// rulesPathPrefix:
// - Key: /pd/{cluster_id}/rules/{group_id}-{rule_id}
// - Value: placement.Rule
rulesPathPrefix string
// ruleGroupPathPrefix:
// - Key: /pd/{cluster_id}/rule_group/{group_id}
// - Value: placement.RuleGroup
ruleGroupPathPrefix string
// regionLabelPathPrefix:
// - Key: /pd/{cluster_id}/region_label/{rule_id}
// - Value: labeler.LabelRule
regionLabelPathPrefix string

etcdClient *clientv3.Client
ruleStore *ruleStorage

Expand All @@ -117,97 +131,103 @@ type Watcher struct {
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
// rulePath:
// - Key: /pd/{cluster_id}/rules/{group_id}-{rule_id}
// - Value: placement.Rule
// ruleGroupPath:
// - Key: /pd/{cluster_id}/rule_group/{group_id}
// - Value: placement.RuleGroup
// regionLabelPath:
// - Key: /pd/{cluster_id}/region_label/{rule_id}
// - Value: labeler.LabelRule
rulesPath, ruleGroupPath, regionLabelPath string,
clusterID uint64,
) (*Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
rw := &Watcher{
ctx: ctx,
cancel: cancel,
etcdClient: etcdClient,
ruleStore: &ruleStorage{},
ctx: ctx,
cancel: cancel,
rulesPathPrefix: endpoint.RulesPathPrefix(clusterID),
ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
etcdClient: etcdClient,
ruleStore: &ruleStorage{},
}
err := rw.initializeRuleWatcher(rulesPath)
err := rw.initializeRuleWatcher()
if err != nil {
return nil, err
}
err = rw.initializeGroupWatcher(ruleGroupPath)
err = rw.initializeGroupWatcher()
if err != nil {
return nil, err
}
err = rw.initializeRegionLabelWatcher(regionLabelPath)
err = rw.initializeRegionLabelWatcher()
if err != nil {
return nil, err
}
return rw, nil
}

func (rw *Watcher) initializeRuleWatcher(rulePath string) error {
func (rw *Watcher) initializeRuleWatcher() error {
prefixToTrim := rw.rulesPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
// Since the PD API server will validate the rule before saving it to etcd,
// so we could directly save the string rule in JSON to the storage here.
return rw.ruleStore.SaveRule(string(kv.Key), string(kv.Value))
return rw.ruleStore.SaveRule(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
return rw.ruleStore.DeleteRule(string(kv.Key))
return rw.ruleStore.DeleteRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
return nil
}
rw.ruleWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-watcher", rulePath,
"scheduling-rule-watcher", rw.rulesPathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
rw.ruleWatcher.StartWatchLoop()
return rw.ruleWatcher.WaitLoad()
}

func (rw *Watcher) initializeGroupWatcher(ruleGroupPath string) error {
func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
return rw.ruleStore.SaveRuleGroup(string(kv.Key), string(kv.Value))
return rw.ruleStore.SaveRuleGroup(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
return rw.ruleStore.DeleteRuleGroup(string(kv.Key))
return rw.ruleStore.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
return nil
}
rw.groupWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-group-watcher", ruleGroupPath,
"scheduling-rule-group-watcher", rw.ruleGroupPathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
rw.groupWatcher.StartWatchLoop()
return rw.groupWatcher.WaitLoad()
}

func (rw *Watcher) initializeRegionLabelWatcher(regionLabelPath string) error {
func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
return rw.ruleStore.SaveRegionRule(string(kv.Key), string(kv.Value))
return rw.ruleStore.SaveRegionRule(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
return rw.ruleStore.DeleteRegionRule(string(kv.Key))
return rw.ruleStore.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
return nil
}
rw.labelWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-region-label-watcher", regionLabelPath,
"scheduling-region-label-watcher", rw.regionLabelPathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
Expand Down
9 changes: 2 additions & 7 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,18 +550,13 @@ func (s *Server) startServer() (err error) {

func (s *Server) startWatcher() (err error) {
s.configWatcher, err = config.NewWatcher(
s.ctx, s.etcdClient,
endpoint.ConfigPath(s.clusterID),
s.persistConfig,
s.ctx, s.etcdClient, s.clusterID, s.persistConfig,
)
if err != nil {
return err
}
s.ruleWatcher, err = rule.NewWatcher(
s.ctx, s.etcdClient,
endpoint.RulesPath(s.clusterID),
endpoint.RuleGroupPath(s.clusterID),
endpoint.RegionLabelPath(s.clusterID),
s.ctx, s.etcdClient, s.clusterID,
)
return err
}
Expand Down
Loading

0 comments on commit 6902699

Please sign in to comment.