Skip to content

Commit

Permalink
save work
Browse files Browse the repository at this point in the history
Signed-off-by: okJiang <[email protected]>
  • Loading branch information
okJiang committed Aug 8, 2024
1 parent 56f082d commit 86ad195
Show file tree
Hide file tree
Showing 32 changed files with 137 additions and 117 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,4 @@ issues:
- errcheck
include:
# remove the comment after the path is ready
# - EXC0012
- EXC0012
1 change: 1 addition & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,7 @@ func (c *client) UpdateOption(option DynamicOption, value any) error {
return nil
}

// GetAllMembers gets the members Info from PD.
func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
start := time.Now()
defer func() { cmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }()
Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type server struct {
*scheserver.Server
}

// GetCluster returns the SchedulerCluster.
func (s *server) GetCluster() sche.SchedulerCluster {
return s.Server.GetCluster()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
// In order to avoid the patrolRegionScanLimit to be too big or too small, it will be limited to [128,8192].
// It takes about 10s to iterate 1,024,000 regions(with DefaultPatrolRegionInterval=10ms) where other steps are not considered.
MinPatrolRegionScanLimit = 128
// MaxPatrolScanRegionLimit is the max limit of regions to scan.
MaxPatrolScanRegionLimit = 8192
patrolRegionPartition = 1024
)
Expand Down
3 changes: 2 additions & 1 deletion pkg/schedule/prepare_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,14 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti
return true
}

// IsPrepared returns whether the store is prepared.
func (checker *prepareChecker) IsPrepared() bool {
checker.RLock()
defer checker.RUnlock()
return checker.prepared
}

// for test purpose
// SetPrepared is for test purpose
func (checker *prepareChecker) SetPrepared() {
checker.Lock()
defer checker.Unlock()
Expand Down
9 changes: 7 additions & 2 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (conf *balanceLeaderSchedulerConfig) validateLocked() bool {
return conf.Batch >= 1 && conf.Batch <= 10
}

func (conf *balanceLeaderSchedulerConfig) Clone() *balanceLeaderSchedulerConfig {
func (conf *balanceLeaderSchedulerConfig) clone() *balanceLeaderSchedulerConfig {
conf.RLock()
defer conf.RUnlock()
ranges := make([]core.KeyRange, len(conf.Ranges))
Expand Down Expand Up @@ -159,7 +159,7 @@ func (handler *balanceLeaderHandler) updateConfig(w http.ResponseWriter, r *http
}

func (handler *balanceLeaderHandler) listConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.Clone()
conf := handler.config.clone()
handler.rd.JSON(w, http.StatusOK, conf)
}

Expand Down Expand Up @@ -192,6 +192,7 @@ func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceL
return s
}

// ServeHTTP implements the http.Handler interface.
func (l *balanceLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
l.handler.ServeHTTP(w, r)
}
Expand All @@ -206,12 +207,14 @@ func WithBalanceLeaderName(name string) BalanceLeaderCreateOption {
}
}

// EncodeConfig implements the Scheduler interface.
func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) {
l.conf.RLock()
defer l.conf.RUnlock()
return EncodeConfig(l.conf)
}

// ReloadConfig implements the Scheduler interface.
func (l *balanceLeaderScheduler) ReloadConfig() error {
l.conf.Lock()
defer l.conf.Unlock()
Expand All @@ -231,6 +234,7 @@ func (l *balanceLeaderScheduler) ReloadConfig() error {
return nil
}

// GetName implements the Scheduler interface.
func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
allowed := l.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit()
if !allowed {
Expand Down Expand Up @@ -331,6 +335,7 @@ func (cs *candidateStores) resortStoreWithPos(pos int) {
}
}

// Schedule implements the Scheduler interface.
func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
basePlan := plan.NewBalanceSchedulerPlan()
var collector *plan.Collector
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/balance_leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestBalanceLeaderSchedulerConfigClone(t *testing.T) {
Ranges: keyRanges1,
Batch: 10,
}
conf2 := conf.Clone()
conf2 := conf.clone()
re.Equal(conf.Batch, conf2.Batch)
re.Equal(conf.Ranges, conf2.Ranges)

Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type balanceWitnessSchedulerConfig struct {
Batch int `json:"batch"`
}

func (conf *balanceWitnessSchedulerConfig) Update(data []byte) (int, any) {
func (conf *balanceWitnessSchedulerConfig) update(data []byte) (int, any) {
conf.Lock()
defer conf.Unlock()

Expand Down Expand Up @@ -99,7 +99,7 @@ func (conf *balanceWitnessSchedulerConfig) validateLocked() bool {
return conf.Batch >= 1 && conf.Batch <= 10
}

func (conf *balanceWitnessSchedulerConfig) Clone() *balanceWitnessSchedulerConfig {
func (conf *balanceWitnessSchedulerConfig) clone() *balanceWitnessSchedulerConfig {
conf.RLock()
defer conf.RUnlock()
ranges := make([]core.KeyRange, len(conf.Ranges))
Expand Down Expand Up @@ -151,12 +151,12 @@ func newBalanceWitnessHandler(conf *balanceWitnessSchedulerConfig) http.Handler
func (handler *balanceWitnessHandler) updateConfig(w http.ResponseWriter, r *http.Request) {
data, _ := io.ReadAll(r.Body)
r.Body.Close()
httpCode, v := handler.config.Update(data)
httpCode, v := handler.config.update(data)
handler.rd.JSON(w, httpCode, v)
}

func (handler *balanceWitnessHandler) listConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.Clone()
conf := handler.config.clone()
handler.rd.JSON(w, http.StatusOK, conf)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (conf *evictLeaderSchedulerConfig) getBatch() int {
return conf.Batch
}

func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig {
func (conf *evictLeaderSchedulerConfig) clone() *evictLeaderSchedulerConfig {
conf.RLock()
defer conf.RUnlock()
storeIDWithRanges := make(map[uint64][]core.KeyRange)
Expand Down Expand Up @@ -462,7 +462,7 @@ func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R
}

func (handler *evictLeaderHandler) listConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.Clone()
conf := handler.config.clone()
handler.rd.JSON(w, http.StatusOK, conf)
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/schedulers/evict_leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,19 @@ func TestConfigClone(t *testing.T) {
re := require.New(t)

emptyConf := &evictLeaderSchedulerConfig{StoreIDWithRanges: make(map[uint64][]core.KeyRange)}
con2 := emptyConf.Clone()
con2 := emptyConf.clone()
re.Empty(con2.getKeyRangesByID(1))

con2.StoreIDWithRanges[1], _ = getKeyRanges([]string{"a", "b", "c", "d"})
con3 := con2.Clone()
con3 := con2.clone()
re.Equal(len(con3.getRanges(1)), len(con2.getRanges(1)))

con3.StoreIDWithRanges[1][0].StartKey = []byte("aaa")
con4 := con3.Clone()
con4 := con3.clone()
re.True(bytes.Equal(con4.StoreIDWithRanges[1][0].StartKey, con3.StoreIDWithRanges[1][0].StartKey))

con4.Batch = 10
con5 := con4.Clone()
con5 := con4.clone()
re.Equal(con5.getBatch(), con4.getBatch())
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/schedule/schedulers/evict_slow_trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func initEvictSlowTrendSchedulerConfig(storage endpoint.ConfigStorage) *evictSlo
}
}

func (conf *evictSlowTrendSchedulerConfig) Clone() *evictSlowTrendSchedulerConfig {
func (conf *evictSlowTrendSchedulerConfig) clone() *evictSlowTrendSchedulerConfig {
conf.RLock()
defer conf.RUnlock()
return &evictSlowTrendSchedulerConfig{
Expand Down Expand Up @@ -268,7 +268,7 @@ func (handler *evictSlowTrendHandler) updateConfig(w http.ResponseWriter, r *htt
}

func (handler *evictSlowTrendHandler) listConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.Clone()
conf := handler.config.clone()
handler.rd.JSON(w, http.StatusOK, conf)
}

Expand All @@ -278,6 +278,7 @@ type evictSlowTrendScheduler struct {
handler http.Handler
}

// GetNextInterval implements the Scheduler interface.
func (s *evictSlowTrendScheduler) GetNextInterval(time.Duration) time.Duration {
var growthType intervalGrowthType
// If it already found a slow node as candidate, the next interval should be shorter
Expand Down
5 changes: 3 additions & 2 deletions pkg/schedule/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (conf *grantHotRegionSchedulerConfig) clone() *grantHotRegionSchedulerConfi
}
}

func (conf *grantHotRegionSchedulerConfig) Persist() error {
func (conf *grantHotRegionSchedulerConfig) persist() error {
conf.RLock()
defer conf.RUnlock()
data, err := EncodeConfig(conf)
Expand Down Expand Up @@ -217,7 +217,7 @@ func (handler *grantHotRegionHandler) updateConfig(w http.ResponseWriter, r *htt
return
}

if err = handler.config.Persist(); err != nil {
if err = handler.config.persist(); err != nil {
handler.config.setStoreLeaderID(0)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand All @@ -241,6 +241,7 @@ func newGrantHotRegionHandler(config *grantHotRegionSchedulerConfig) http.Handle
return router
}

// Schedule implements the Scheduler interface.
func (s *grantHotRegionScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) {
grantHotRegionCounter.Inc()
typ := s.randomType()
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type grantLeaderSchedulerConfig struct {
removeSchedulerCb func(name string) error
}

func (conf *grantLeaderSchedulerConfig) BuildWithArgs(args []string) error {
func (conf *grantLeaderSchedulerConfig) buildWithArgs(args []string) error {
if len(args) != 1 {
return errs.ErrSchedulerConfig.FastGenByArgs("id")
}
Expand Down Expand Up @@ -287,7 +287,7 @@ func (handler *grantLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R
args = append(args, handler.config.getRanges(id)...)
}

err := handler.config.BuildWithArgs(args)
err := handler.config.buildWithArgs(args)
if err != nil {
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/shuffle_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type shuffleHotRegionSchedulerConfig struct {
Limit uint64 `json:"limit"`
}

func (conf *shuffleHotRegionSchedulerConfig) Clone() *shuffleHotRegionSchedulerConfig {
func (conf *shuffleHotRegionSchedulerConfig) clone() *shuffleHotRegionSchedulerConfig {
conf.RLock()
defer conf.RUnlock()
return &shuffleHotRegionSchedulerConfig{
Expand Down Expand Up @@ -240,7 +240,7 @@ func (handler *shuffleHotRegionHandler) updateConfig(w http.ResponseWriter, r *h
}

func (handler *shuffleHotRegionHandler) listConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.Clone()
conf := handler.config.clone()
handler.rd.JSON(w, http.StatusOK, conf)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/schedulers/split_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func newSplitBucketScheduler(opController *operator.Controller, conf *splitBucke
return ret
}

// ReloadConfig implement Scheduler interface.
func (s *splitBucketScheduler) ReloadConfig() error {
s.conf.Lock()
defer s.conf.Unlock()
Expand Down
Loading

0 comments on commit 86ad195

Please sign in to comment.