Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Sep 23, 2024
1 parent 921ed79 commit 30563b7
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 95 deletions.
12 changes: 7 additions & 5 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

package core

import "bytes"
import (
"bytes"

"github.com/tikv/pd/pkg/core/constant"
)

// BasicCluster provides basic data member and interface for a tikv cluster.
type BasicCluster struct {
Expand Down Expand Up @@ -137,10 +141,8 @@ type StoreSetInformer interface {

// StoreSetController is used to control stores' status.
type StoreSetController interface {
PauseLeaderTransferIn(id uint64) error
ResumeLeaderTransferIn(id uint64)
PauseLeaderTransferOut(id uint64) error
ResumeLeaderTransferOut(id uint64)
PauseLeaderTransfer(id uint64, d constant.Direction) error
ResumeLeaderTransfer(id uint64, d constant.Direction)

SlowStoreEvicted(id uint64) error
SlowStoreRecovered(id uint64)
Expand Down
10 changes: 10 additions & 0 deletions pkg/core/constant/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,13 @@ func StringToKeyType(input string) KeyType {
panic("invalid key type: " + input)
}
}

// Direction distinguishes different kinds of direction.
type Direction int

const (
// In indicates that the direction is in.
In Direction = iota
// Out indicates that the direction is out.
Out
)
53 changes: 15 additions & 38 deletions pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,55 +782,32 @@ func (s *StoresInfo) ResetStores() {
s.stores = make(map[uint64]*StoreInfo)
}

// PauseLeaderTransferIn pauses a StoreInfo with storeID. The store can not be selected
// as target of TransferLeader again.
func (s *StoresInfo) PauseLeaderTransferIn(storeID uint64) error {
// PauseLeaderTransfer pauses a StoreInfo with storeID. The store can not be selected
// as source or target of TransferLeader.
func (s *StoresInfo) PauseLeaderTransfer(storeID uint64, direction constant.Direction) error {
s.Lock()
defer s.Unlock()
store, ok := s.stores[storeID]
if !ok {
return errs.ErrStoreNotFound.FastGenByArgs(storeID)
}
if !store.AllowLeaderTransferIn() {
return errs.ErrPauseLeaderTransferIn.FastGenByArgs(storeID)
switch direction {
case constant.In:
if !store.AllowLeaderTransferIn() {
return errs.ErrPauseLeaderTransferIn.FastGenByArgs(storeID)
}
case constant.Out:
if !store.AllowLeaderTransferOut() {
return errs.ErrPauseLeaderTransferOut.FastGenByArgs(storeID)
}
}
s.stores[storeID] = store.Clone(PauseLeaderTransferIn())
s.stores[storeID] = store.Clone(PauseLeaderTransfer(direction))
return nil
}

// ResumeLeaderTransferIn cleans a store's pause state. The store can be selected
// as target of TransferLeader again.
func (s *StoresInfo) ResumeLeaderTransferIn(storeID uint64) {
s.Lock()
defer s.Unlock()
store, ok := s.stores[storeID]
if !ok {
log.Warn("try to clean a store's pause state, but it is not found. It may be cleanup",
zap.Uint64("store-id", storeID))
return
}
s.stores[storeID] = store.Clone(ResumeLeaderTransferIn())
}

// PauseLeaderTransferOut pauses a StoreInfo with storeID. The store can not be selected
// as source of TransferLeader again.
func (s *StoresInfo) PauseLeaderTransferOut(storeID uint64) error {
s.Lock()
defer s.Unlock()
store, ok := s.stores[storeID]
if !ok {
return errs.ErrStoreNotFound.FastGenByArgs(storeID)
}
if !store.AllowLeaderTransferOut() {
return errs.ErrPauseLeaderTransferOut.FastGenByArgs(storeID)
}
s.stores[storeID] = store.Clone(PauseLeaderTransferOut())
return nil
}

// ResumeLeaderTransferOut cleans a store's pause state. The store can be selected
// as source of TransferLeader again.
func (s *StoresInfo) ResumeLeaderTransferOut(storeID uint64) {
func (s *StoresInfo) ResumeLeaderTransfer(storeID uint64, direction constant.Direction) {
s.Lock()
defer s.Unlock()
store, ok := s.stores[storeID]
Expand All @@ -839,7 +816,7 @@ func (s *StoresInfo) ResumeLeaderTransferOut(storeID uint64) {
zap.Uint64("store-id", storeID))
return
}
s.stores[storeID] = store.Clone(ResumeLeaderTransferOut())
s.stores[storeID] = store.Clone(ResumeLeaderTransfer(direction))
}

// SlowStoreEvicted marks a store as a slow store and prevents transferring
Expand Down
23 changes: 17 additions & 6 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/utils/typeutil"
)
Expand Down Expand Up @@ -98,17 +99,27 @@ func SetStoreState(state metapb.StoreState, physicallyDestroyed ...bool) StoreCr
}
}

// PauseLeaderTransferIn prevents the store from been selected as target store of TransferLeader.
func PauseLeaderTransferIn() StoreCreateOption {
// PauseLeaderTransfer prevents the store from been selected as source or target store of TransferLeader.
func PauseLeaderTransfer(d constant.Direction) StoreCreateOption {
return func(store *StoreInfo) {
store.pauseLeaderTransferIn = true
switch d {
case constant.In:
store.pauseLeaderTransferIn = true
case constant.Out:
store.pauseLeaderTransferOut = true
}
}
}

// ResumeLeaderTransferIn cleans a store's pause state. The store can be selected as target of TransferLeader again.
func ResumeLeaderTransferIn() StoreCreateOption {
// ResumeLeaderTransfer cleans a store's pause state. The store can be selected as source or target of TransferLeader again.
func ResumeLeaderTransfer(d constant.Direction) StoreCreateOption {
return func(store *StoreInfo) {
store.pauseLeaderTransferIn = false
switch d {
case constant.In:
store.pauseLeaderTransferIn = false
case constant.Out:
store.pauseLeaderTransferOut = false
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mock/mockid"
Expand Down Expand Up @@ -567,9 +568,9 @@ func (mc *Cluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) {
func (mc *Cluster) SetStoreEvictLeader(storeID uint64, enableEvictLeader bool) {
store := mc.GetStore(storeID)
if enableEvictLeader {
mc.PutStore(store.Clone(core.PauseLeaderTransferIn()))
mc.PutStore(store.Clone(core.PauseLeaderTransfer(constant.In)))
} else {
mc.PutStore(store.Clone(core.ResumeLeaderTransferIn()))
mc.PutStore(store.Clone(core.ResumeLeaderTransfer(constant.In)))
}
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ func (conf *evictLeaderSchedulerConfig) removeStoreLocked(id uint64) (bool, erro
_, exists := conf.StoreIDWithRanges[id]
if exists {
delete(conf.StoreIDWithRanges, id)
conf.cluster.ResumeLeaderTransferIn(id)
conf.cluster.ResumeLeaderTransfer(id, constant.In)
return len(conf.StoreIDWithRanges) == 0, nil
}
return false, errs.ErrScheduleConfigNotExist.FastGenByArgs()
}

func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) {
if err := conf.cluster.PauseLeaderTransferIn(id); err != nil {
if err := conf.cluster.PauseLeaderTransfer(id, constant.In); err != nil {
log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err))
}
conf.StoreIDWithRanges[id] = keyRange
Expand Down Expand Up @@ -138,7 +138,7 @@ func (conf *evictLeaderSchedulerConfig) reloadConfig() error {
if err := conf.load(newCfg); err != nil {
return err
}
pauseAndResumeLeaderTransferIn(conf.cluster, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
pauseAndResumeLeaderTransfer(conf.cluster, constant.In, conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
conf.StoreIDWithRanges = newCfg.StoreIDWithRanges
conf.Batch = newCfg.Batch
return nil
Expand All @@ -149,7 +149,7 @@ func (conf *evictLeaderSchedulerConfig) pauseLeaderTransfer(cluster sche.Schedul
defer conf.RUnlock()
var res error
for id := range conf.StoreIDWithRanges {
if err := cluster.PauseLeaderTransferIn(id); err != nil {
if err := cluster.PauseLeaderTransfer(id, constant.In); err != nil {
res = err
}
}
Expand All @@ -160,15 +160,15 @@ func (conf *evictLeaderSchedulerConfig) resumeLeaderTransfer(cluster sche.Schedu
conf.RLock()
defer conf.RUnlock()
for id := range conf.StoreIDWithRanges {
cluster.ResumeLeaderTransferIn(id)
cluster.ResumeLeaderTransfer(id, constant.In)
}
}

func (conf *evictLeaderSchedulerConfig) pauseLeaderTransferIfStoreNotExist(id uint64) (bool, error) {
conf.RLock()
defer conf.RUnlock()
if _, exist := conf.StoreIDWithRanges[id]; !exist {
if err := conf.cluster.PauseLeaderTransferIn(id); err != nil {
if err := conf.cluster.PauseLeaderTransfer(id, constant.In); err != nil {
return exist, err
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
Expand Down Expand Up @@ -201,7 +202,7 @@ func (s *evictSlowStoreScheduler) ReloadConfig() error {
for _, id := range newCfg.EvictedStores {
new[id] = struct{}{}
}
pauseAndResumeLeaderTransferIn(s.conf.cluster, old, new)
pauseAndResumeLeaderTransfer(s.conf.cluster, constant.In, old, new)
s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap
s.conf.EvictedStores = newCfg.EvictedStores
return nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/schedule/schedulers/evict_slow_trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
Expand Down Expand Up @@ -301,7 +302,7 @@ func (s *evictSlowTrendScheduler) ReloadConfig() error {
for _, id := range newCfg.EvictedStores {
new[id] = struct{}{}
}
pauseAndResumeLeaderTransferIn(s.conf.cluster, old, new)
pauseAndResumeLeaderTransfer(s.conf.cluster, constant.In, old, new)
s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap
s.conf.EvictedStores = newCfg.EvictedStores
return nil
Expand Down
12 changes: 6 additions & 6 deletions pkg/schedule/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (conf *grantLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last
succ, last = false, false
if exists {
delete(conf.StoreIDWithRanges, id)
conf.cluster.ResumeLeaderTransferOut(id)
conf.cluster.ResumeLeaderTransfer(id, constant.Out)
succ = true
last = len(conf.StoreIDWithRanges) == 0
}
Expand All @@ -109,7 +109,7 @@ func (conf *grantLeaderSchedulerConfig) removeStore(id uint64) (succ bool, last
func (conf *grantLeaderSchedulerConfig) resetStore(id uint64, keyRange []core.KeyRange) {
conf.Lock()
defer conf.Unlock()
if err := conf.cluster.PauseLeaderTransferOut(id); err != nil {
if err := conf.cluster.PauseLeaderTransfer(id, constant.Out); err != nil {
log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err))
}
conf.StoreIDWithRanges[id] = keyRange
Expand Down Expand Up @@ -171,7 +171,7 @@ func (s *grantLeaderScheduler) ReloadConfig() error {
if err := s.conf.load(newCfg); err != nil {
return err
}
pauseAndResumeLeaderTransferOut(s.conf.cluster, s.conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
pauseAndResumeLeaderTransfer(s.conf.cluster, constant.Out, s.conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
s.conf.StoreIDWithRanges = newCfg.StoreIDWithRanges
return nil
}
Expand All @@ -182,7 +182,7 @@ func (s *grantLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) erro
defer s.conf.RUnlock()
var res error
for id := range s.conf.StoreIDWithRanges {
if err := cluster.PauseLeaderTransferOut(id); err != nil {
if err := cluster.PauseLeaderTransfer(id, constant.Out); err != nil {
res = err
}
}
Expand All @@ -194,7 +194,7 @@ func (s *grantLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.conf.RLock()
defer s.conf.RUnlock()
for id := range s.conf.StoreIDWithRanges {
cluster.ResumeLeaderTransferOut(id)
cluster.ResumeLeaderTransfer(id, constant.Out)
}
}

Expand Down Expand Up @@ -252,7 +252,7 @@ func (handler *grantLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R
id = (uint64)(idFloat)
handler.config.RLock()
if _, exists = handler.config.StoreIDWithRanges[id]; !exists {
if err := handler.config.cluster.PauseLeaderTransferOut(id); err != nil {
if err := handler.config.cluster.PauseLeaderTransfer(id, constant.Out); err != nil {
handler.config.RUnlock()
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down
26 changes: 4 additions & 22 deletions pkg/schedule/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,38 +391,20 @@ func (q *retryQuota) gc(keepStores []*core.StoreInfo) {
}
}

// pauseAndResumeLeaderTransferIn checks the old and new store IDs, and pause or resume the leader transfer in.
func pauseAndResumeLeaderTransferIn[T any](cluster *core.BasicCluster, old, new map[uint64]T) {
// pauseAndResumeLeaderTransfer checks the old and new store IDs, and pause or resume the leader transfer in or out.
func pauseAndResumeLeaderTransfer[T any](cluster *core.BasicCluster, direction constant.Direction, old, new map[uint64]T) {
for id := range old {
if _, ok := new[id]; ok {
continue
}
cluster.ResumeLeaderTransferIn(id)
cluster.ResumeLeaderTransfer(id, direction)
}
for id := range new {
if _, ok := old[id]; ok {
continue
}
if err := cluster.PauseLeaderTransferIn(id); err != nil {
if err := cluster.PauseLeaderTransfer(id, direction); err != nil {
log.Error("pause leader transfer in failed", zap.Uint64("store-id", id), errs.ZapError(err))
}
}
}

// pauseAndResumeLeaderTransferOut checks the old and new store IDs, and pause or resume the leader transfer out.
func pauseAndResumeLeaderTransferOut[T any](cluster *core.BasicCluster, old, new map[uint64]T) {
for id := range old {
if _, ok := new[id]; ok {
continue
}
cluster.ResumeLeaderTransferOut(id)
}
for id := range new {
if _, ok := old[id]; ok {
continue
}
if err := cluster.PauseLeaderTransferOut(id); err != nil {
log.Error("pause leader transfer out failed", zap.Uint64("store-id", id), errs.ZapError(err))
}
}
}
Loading

0 comments on commit 30563b7

Please sign in to comment.