Skip to content

Commit

Permalink
Merge branch 'master' into fix-err-code
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Sep 13, 2023
2 parents 92b5cc0 + d0cebac commit be000c6
Show file tree
Hide file tree
Showing 28 changed files with 507 additions and 249 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/common v0.26.0
github.com/sasha-s/go-deadlock v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 h1:K9lZMYuDuAiR5kOjFESwJ8KfSb4ui5zX6vZGbUp58uk=
github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27/go.mod h1:OUzFMMVjR1GKlf4LWLqza9QNKjCrYJ7stVn/3PN0djM=
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511 h1:oyrCfNlAWmLlUfEr+7YTSBo29SP/J1N8hnxBt5yUABo=
github.com/pingcap/tidb-dashboard v0.0.0-20230911054332-22add1e00511/go.mod h1:EZ90+V5S4TttbYag6oKZ3jcNKRwZe1Mc9vXwOt9JBYw=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
8 changes: 4 additions & 4 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -7094,14 +7094,14 @@
"expr": "-sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-leader-scheduler\"}[1m])) by (source)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}",
"legendFormat": "store-{{source}}",
"refId": "A"
},
{
"expr": "sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-leader-scheduler\"}[1m])) by (target)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}",
"legendFormat": "store-{{target}}",
"refId": "B"
}
],
Expand Down Expand Up @@ -7198,14 +7198,14 @@
"expr": "-sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-region-scheduler\"}[1m])) by (source)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}",
"legendFormat": "store-{{source}}",
"refId": "A"
},
{
"expr": "sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-region-scheduler\"}[1m])) by (target)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}",
"legendFormat": "store-{{target}}",
"refId": "B"
}
],
Expand Down
21 changes: 0 additions & 21 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
Expand Down Expand Up @@ -204,8 +203,6 @@ type PersistConfig struct {
schedule atomic.Value
replication atomic.Value
storeConfig atomic.Value
// Store the respective configurations for different schedulers.
schedulerConfig sync.Map
}

// NewPersistConfig creates a new PersistConfig instance.
Expand Down Expand Up @@ -275,24 +272,6 @@ func (o *PersistConfig) GetStoreConfig() *sc.StoreConfig {
return o.storeConfig.Load().(*sc.StoreConfig)
}

// SetSchedulerConfig sets the scheduler configuration with the given name.
func (o *PersistConfig) SetSchedulerConfig(name, data string) {
o.schedulerConfig.Store(name, data)
}

// RemoveSchedulerConfig removes the scheduler configuration with the given name.
func (o *PersistConfig) RemoveSchedulerConfig(name string) {
o.schedulerConfig.Delete(name)
}

// GetSchedulerConfig returns the scheduler configuration with the given name.
func (o *PersistConfig) GetSchedulerConfig(name string) string {
if v, ok := o.schedulerConfig.Load(name); ok {
return v.(string)
}
return ""
}

// GetMaxReplicas returns the max replicas.
func (o *PersistConfig) GetMaxReplicas() int {
return int(o.GetReplicationConfig().MaxReplicas)
Expand Down
16 changes: 11 additions & 5 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/coreos/go-semver/semver"
"github.com/pingcap/log"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
Expand Down Expand Up @@ -50,6 +51,9 @@ type Watcher struct {
schedulerConfigWatcher *etcdutil.LoopWatcher

*PersistConfig
// Some data, like the scheduler configs, should be loaded into the storage
// to make sure the coordinator could access them correctly.
storage storage.Storage
}

type persistedConfig struct {
Expand All @@ -65,6 +69,7 @@ func NewWatcher(
etcdClient *clientv3.Client,
clusterID uint64,
persistConfig *PersistConfig,
storage storage.Storage,
) (*Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
cw := &Watcher{
Expand All @@ -74,6 +79,7 @@ func NewWatcher(
schedulerConfigPathPrefix: endpoint.SchedulerConfigPathPrefix(clusterID),
etcdClient: etcdClient,
PersistConfig: persistConfig,
storage: storage,
}
err := cw.initializeConfigWatcher()
if err != nil {
Expand Down Expand Up @@ -120,15 +126,15 @@ func (cw *Watcher) initializeConfigWatcher() error {
func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
cw.SetSchedulerConfig(
return cw.storage.SaveScheduleConfig(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
kv.Value,
)
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
cw.RemoveSchedulerConfig(strings.TrimPrefix(string(kv.Key), prefixToTrim))
return nil
return cw.storage.RemoveScheduleConfig(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
)
}
postEventFn := func() error {
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,11 @@ func (s *Server) startServer() (err error) {

func (s *Server) startCluster(context.Context) error {
s.basicCluster = core.NewBasicCluster()
s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
err := s.startWatcher()
if err != nil {
return err
}
s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster)
s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh)
if err != nil {
Expand All @@ -458,7 +458,7 @@ func (s *Server) startWatcher() (err error) {
if err != nil {
return err
}
s.configWatcher, err = config.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.persistConfig)
s.configWatcher, err = config.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.persistConfig, s.storage)
if err != nil {
return err
}
Expand Down
33 changes: 15 additions & 18 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,7 @@ func (m *Participant) setLeader(member participant) {

// unsetLeader unsets the member's leader.
func (m *Participant) unsetLeader() {
var leader participant
switch m.serviceName {
case utils.TSOServiceName:
leader = &tsopb.Participant{}
case utils.SchedulingServiceName:
leader = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
leader = &resource_manager.Participant{}
}
leader := NewParticipantByService(m.serviceName)
m.leader.Store(leader)
m.lastLeaderUpdatedTime.Store(time.Now())
}
Expand Down Expand Up @@ -225,15 +217,7 @@ func (m *Participant) PreCheckLeader() error {

// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *Participant) getPersistentLeader() (participant, int64, error) {
var leader participant
switch m.serviceName {
case utils.TSOServiceName:
leader = &tsopb.Participant{}
case utils.SchedulingServiceName:
leader = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
leader = &resource_manager.Participant{}
}
leader := NewParticipantByService(m.serviceName)
ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -399,3 +383,16 @@ func (m *Participant) campaignCheck() bool {
func (m *Participant) SetCampaignChecker(checker leadershipCheckFunc) {
m.campaignChecker.Store(checker)
}

// NewParticipantByService creates a new participant by service name.
func NewParticipantByService(serviceName string) (p participant) {
switch serviceName {
case utils.TSOServiceName:
p = &tsopb.Participant{}
case utils.SchedulingServiceName:
p = &schedulingpb.Participant{}
case utils.ResourceManagerServiceName:
p = &resource_manager.Participant{}
}
return p
}
5 changes: 5 additions & 0 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,8 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
if err = c.schedulers.AddScheduler(s); err != nil {
log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
}
} else if err = c.schedulers.AddSchedulerHandler(s); err != nil {
log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
}
}

Expand Down Expand Up @@ -472,6 +474,8 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
scheduleCfg.Schedulers[k] = schedulerCfg
k++
}
} else if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
}
}

Expand Down Expand Up @@ -507,6 +511,7 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) {
return
}
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()))
// TODO: handle the plugin in API service mode.
if err = c.schedulers.AddScheduler(s); err != nil {
log.Error("can't add scheduler", zap.String("scheduler-name", s.GetName()), errs.ZapError(err))
return
Expand Down
6 changes: 4 additions & 2 deletions pkg/schedule/schedulers/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ func ConfigSliceDecoder(name string, args []string) ConfigDecoder {
// CreateSchedulerFunc is for creating scheduler.
type CreateSchedulerFunc func(opController *operator.Controller, storage endpoint.ConfigStorage, dec ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error)

var schedulerMap = make(map[string]CreateSchedulerFunc)
var schedulerArgsToDecoder = make(map[string]ConfigSliceDecoderBuilder)
var (
schedulerMap = make(map[string]CreateSchedulerFunc)
schedulerArgsToDecoder = make(map[string]ConfigSliceDecoderBuilder)
)

// RegisterScheduler binds a scheduler creator. It should be called in init()
// func of a package.
Expand Down
80 changes: 67 additions & 13 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,28 @@ var denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues
// Controller is used to manage all schedulers.
type Controller struct {
sync.RWMutex
wg sync.WaitGroup
ctx context.Context
cluster sche.SchedulerCluster
storage endpoint.ConfigStorage
schedulers map[string]*ScheduleController
opController *operator.Controller
wg sync.WaitGroup
ctx context.Context
cluster sche.SchedulerCluster
storage endpoint.ConfigStorage
// schedulers is used to manage all schedulers, which will only be initialized
// and used in the PD leader service mode now.
schedulers map[string]*ScheduleController
// schedulerHandlers is used to manage the HTTP handlers of schedulers,
// which will only be initialized and used in the API service mode now.
schedulerHandlers map[string]http.Handler
opController *operator.Controller
}

// NewController creates a scheduler controller.
func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage endpoint.ConfigStorage, opController *operator.Controller) *Controller {
return &Controller{
ctx: ctx,
cluster: cluster,
storage: storage,
schedulers: make(map[string]*ScheduleController),
opController: opController,
ctx: ctx,
cluster: cluster,
storage: storage,
schedulers: make(map[string]*ScheduleController),
schedulerHandlers: make(map[string]http.Handler),
opController: opController,
}
}

Expand Down Expand Up @@ -86,6 +92,9 @@ func (c *Controller) GetSchedulerNames() []string {
func (c *Controller) GetSchedulerHandlers() map[string]http.Handler {
c.RLock()
defer c.RUnlock()
if len(c.schedulerHandlers) > 0 {
return c.schedulerHandlers
}
handlers := make(map[string]http.Handler, len(c.schedulers))
for name, scheduler := range c.schedulers {
handlers[name] = scheduler.Scheduler
Expand Down Expand Up @@ -117,6 +126,50 @@ func (c *Controller) ResetSchedulerMetrics() {
schedulerStatusGauge.Reset()
}

// AddSchedulerHandler adds the HTTP handler for a scheduler.
func (c *Controller) AddSchedulerHandler(scheduler Scheduler, args ...string) error {
c.Lock()
defer c.Unlock()

name := scheduler.GetName()
if _, ok := c.schedulerHandlers[name]; ok {
return errs.ErrSchedulerExisted.FastGenByArgs()
}

c.schedulerHandlers[name] = scheduler
c.cluster.GetSchedulerConfig().AddSchedulerCfg(scheduler.GetType(), args)
return nil
}

// RemoveSchedulerHandler removes the HTTP handler for a scheduler.
func (c *Controller) RemoveSchedulerHandler(name string) error {
c.Lock()
defer c.Unlock()
if c.cluster == nil {
return errs.ErrNotBootstrapped.FastGenByArgs()
}
s, ok := c.schedulerHandlers[name]
if !ok {
return errs.ErrSchedulerNotFound.FastGenByArgs()
}

conf := c.cluster.GetSchedulerConfig()
conf.RemoveSchedulerCfg(s.(Scheduler).GetType())
if err := conf.Persist(c.storage); err != nil {
log.Error("the option can not persist scheduler config", errs.ZapError(err))
return err
}

if err := c.storage.RemoveScheduleConfig(name); err != nil {
log.Error("can not remove the scheduler config", errs.ZapError(err))
return err
}

delete(c.schedulerHandlers, name)

return nil
}

// AddScheduler adds a scheduler.
func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error {
c.Lock()
Expand Down Expand Up @@ -249,8 +302,9 @@ func (c *Controller) IsSchedulerExisted(name string) (bool, error) {
if c.cluster == nil {
return false, errs.ErrNotBootstrapped.FastGenByArgs()
}
_, ok := c.schedulers[name]
if !ok {
_, existScheduler := c.schedulers[name]
_, existHandler := c.schedulerHandlers[name]
if !existScheduler && !existHandler {
return false, errs.ErrSchedulerNotFound.FastGenByArgs()
}
return true, nil
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ func KeyspaceGroupPrimaryPath(rootPath string, keyspaceGroupID uint32) string {
return path.Join(electionPath, utils.PrimaryKey)
}

// SchedulingPrimaryPath returns the path of scheduling primary.
// Path: /ms/{cluster_id}/scheduling/primary
func SchedulingPrimaryPath(clusterID uint64) string {
return path.Join(SchedulingSvcRootPath(clusterID), utils.PrimaryKey)
}

// KeyspaceGroupsElectionPath returns the path of keyspace groups election.
// default keyspace group: "/ms/{cluster_id}/tso/00000".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}".
Expand Down
Loading

0 comments on commit be000c6

Please sign in to comment.