Skip to content

Commit

Permalink
enlarge store limit when preparing store
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Aug 8, 2023
1 parent ebd2cfe commit 61d9710
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 36 deletions.
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ func (o *PersistConfig) GetStoreLimit(storeID uint64) (returnSC sc.StoreLimitCon
}
cfg := o.GetScheduleConfig().Clone()
sc := sc.StoreLimitConfig{
AddPeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
AddPeer: sc.DefaultServingStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: sc.DefaultServingStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
}

cfg.StoreLimit[storeID] = sc
Expand Down
16 changes: 10 additions & 6 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,14 @@ const (

var (
defaultLocationLabels = []string{}
// DefaultStoreLimit is the default store limit of add peer and remove peer.
DefaultStoreLimit = StoreLimit{AddPeer: 15, RemovePeer: 15}
// DefaultTiFlashStoreLimit is the default TiFlash store limit of add peer and remove peer.
DefaultTiFlashStoreLimit = StoreLimit{AddPeer: 30, RemovePeer: 30}
// DefaultPreparingStoreLimit is the default store limit of add peer and remove peer when preparing.
DefaultPreparingStoreLimit = StoreLimit{AddPeer: 100, RemovePeer: 100}
// DefaultPreparingTiFlashStoreLimit is the default TiFlash store limit of add peer and remove peer when preparing.
DefaultPreparingTiFlashStoreLimit = StoreLimit{AddPeer: 30, RemovePeer: 30}
// DefaultServingStoreLimit is the default store limit of add peer and remove peer when serving.
DefaultServingStoreLimit = StoreLimit{AddPeer: 15, RemovePeer: 15}
// DefaultServingTiFlashStoreLimit is the default TiFlash store limit of add peer and remove peer when serving.
DefaultServingTiFlashStoreLimit = StoreLimit{AddPeer: 30, RemovePeer: 30}
)

// StoreLimit is the default limit of adding peer and removing peer when putting stores.
Expand Down Expand Up @@ -378,7 +382,7 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool)
}

if c.StoreBalanceRate != 0 {
DefaultStoreLimit = StoreLimit{AddPeer: c.StoreBalanceRate, RemovePeer: c.StoreBalanceRate}
DefaultServingStoreLimit = StoreLimit{AddPeer: c.StoreBalanceRate, RemovePeer: c.StoreBalanceRate}
c.StoreBalanceRate = 0
}

Expand Down Expand Up @@ -442,7 +446,7 @@ func (c *ScheduleConfig) parseDeprecatedFlag(meta *configutil.ConfigMetaData, na
func (c *ScheduleConfig) MigrateDeprecatedFlags() {
c.DisableLearner = false
if c.StoreBalanceRate != 0 {
DefaultStoreLimit = StoreLimit{AddPeer: c.StoreBalanceRate, RemovePeer: c.StoreBalanceRate}
DefaultServingStoreLimit = StoreLimit{AddPeer: c.StoreBalanceRate, RemovePeer: c.StoreBalanceRate}
c.StoreBalanceRate = 0
}
for _, b := range c.migrateConfigurationMap() {
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/endpoint/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ func (se *StorageEndpoint) LoadStores(f func(store *core.StoreInfo)) error {
if err != nil {
return err
}
newStoreInfo := core.NewStoreInfo(store, core.SetLeaderWeight(leaderWeight), core.SetRegionWeight(regionWeight))
var newStoreInfo *core.StoreInfo
if store.NodeState == metapb.NodeState_Preparing {
newStoreInfo = core.NewStoreInfo(store, core.SetLeaderWeight(leaderWeight), core.SetRegionWeight(regionWeight), core.PauseLeaderTransfer())
} else {
newStoreInfo = core.NewStoreInfo(store, core.SetLeaderWeight(leaderWeight), core.SetRegionWeight(regionWeight))
}

nextID = store.GetId() + 1
f(newStoreInfo)
Expand Down
51 changes: 31 additions & 20 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1276,7 +1276,7 @@ func (c *RaftCluster) PutStore(store *metapb.Store) error {
return err
}
c.OnStoreVersionChange()
c.AddStoreLimit(store)
c.UpdateStoreLimit(store)
return nil
}

Expand Down Expand Up @@ -1309,7 +1309,7 @@ func (c *RaftCluster) putStoreImpl(store *metapb.Store, force bool) error {
s := c.GetStore(store.GetId())
if s == nil {
// Add a new store.
s = core.NewStoreInfo(store)
s = core.NewStoreInfo(store, core.PauseLeaderTransfer())
} else {
// Use the given labels to update the store.
labels := store.GetLabels()
Expand Down Expand Up @@ -1660,12 +1660,13 @@ func (c *RaftCluster) ReadyToServe(storeID uint64) error {
return errs.ErrStoreServing.FastGenByArgs(storeID)
}

newStore := store.Clone(core.UpStore())
newStore := store.Clone(core.UpStore(), core.ResumeLeaderTransfer())
log.Info("store has changed to serving",
zap.Uint64("store-id", storeID),
zap.String("store-address", newStore.GetAddress()))
err := c.putStoreLocked(newStore)
if err == nil {
c.UpdateStoreLimit(store.GetMeta())
c.resetProgress(storeID, store.GetAddress())
}
return err
Expand Down Expand Up @@ -2326,22 +2327,32 @@ func (c *RaftCluster) GetAllStoresLimit() map[uint64]sc.StoreLimitConfig {
return c.opt.GetAllStoresLimit()
}

// AddStoreLimit add a store limit for a given store ID.
func (c *RaftCluster) AddStoreLimit(store *metapb.Store) {
// UpdateStoreLimit updates a store limit for a given store ID.
func (c *RaftCluster) UpdateStoreLimit(store *metapb.Store) {
storeID := store.GetId()
cfg := c.opt.GetScheduleConfig().Clone()
if _, ok := cfg.StoreLimit[storeID]; ok {
return
}

slc := sc.StoreLimitConfig{
AddPeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
}
if core.IsStoreContainLabel(store, core.EngineKey, core.EngineTiFlash) {
var slc sc.StoreLimitConfig
if store.GetNodeState() == metapb.NodeState_Preparing {
slc = sc.StoreLimitConfig{
AddPeer: sc.DefaultTiFlashStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: sc.DefaultTiFlashStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
AddPeer: sc.DefaultPreparingStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: sc.DefaultPreparingStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
}
if core.IsStoreContainLabel(store, core.EngineKey, core.EngineTiFlash) {
slc = sc.StoreLimitConfig{
AddPeer: sc.DefaultPreparingTiFlashStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: sc.DefaultPreparingTiFlashStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
}
}
} else {
slc = sc.StoreLimitConfig{
AddPeer: sc.DefaultServingStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: sc.DefaultServingStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
}
if core.IsStoreContainLabel(store, core.EngineKey, core.EngineTiFlash) {
slc = sc.StoreLimitConfig{
AddPeer: sc.DefaultServingTiFlashStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: sc.DefaultServingTiFlashStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
}
}
}

Expand Down Expand Up @@ -2518,14 +2529,14 @@ func (c *RaftCluster) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePer
// SetAllStoresLimit sets all store limit for a given type and rate.
func (c *RaftCluster) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64) error {
old := c.opt.GetScheduleConfig().Clone()
oldAdd := sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer)
oldRemove := sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer)
oldAdd := sc.DefaultServingStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer)
oldRemove := sc.DefaultServingStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer)
c.opt.SetAllStoresLimit(typ, ratePerMin)
if err := c.opt.Persist(c.storage); err != nil {
// roll back the store limit
c.opt.SetScheduleConfig(old)
sc.DefaultStoreLimit.SetDefaultStoreLimit(storelimit.AddPeer, oldAdd)
sc.DefaultStoreLimit.SetDefaultStoreLimit(storelimit.RemovePeer, oldRemove)
sc.DefaultServingStoreLimit.SetDefaultStoreLimit(storelimit.AddPeer, oldAdd)
sc.DefaultServingStoreLimit.SetDefaultStoreLimit(storelimit.RemovePeer, oldRemove)
log.Error("persist store limit meet error", errs.ZapError(err))
return err
}
Expand Down
12 changes: 6 additions & 6 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,14 +341,14 @@ func (o *PersistOptions) SetStoreLimit(storeID uint64, typ storelimit.Type, rate
switch typ {
case storelimit.AddPeer:
if _, ok := v.StoreLimit[storeID]; !ok {
rate = sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer)
rate = sc.DefaultServingStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer)
} else {
rate = v.StoreLimit[storeID].RemovePeer
}
slc = sc.StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: rate}
case storelimit.RemovePeer:
if _, ok := v.StoreLimit[storeID]; !ok {
rate = sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer)
rate = sc.DefaultServingStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer)
} else {
rate = v.StoreLimit[storeID].AddPeer
}
Expand All @@ -363,13 +363,13 @@ func (o *PersistOptions) SetAllStoresLimit(typ storelimit.Type, ratePerMin float
v := o.GetScheduleConfig().Clone()
switch typ {
case storelimit.AddPeer:
sc.DefaultStoreLimit.SetDefaultStoreLimit(storelimit.AddPeer, ratePerMin)
sc.DefaultServingStoreLimit.SetDefaultStoreLimit(storelimit.AddPeer, ratePerMin)
for storeID := range v.StoreLimit {
sc := sc.StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: v.StoreLimit[storeID].RemovePeer}
v.StoreLimit[storeID] = sc
}
case storelimit.RemovePeer:
sc.DefaultStoreLimit.SetDefaultStoreLimit(storelimit.RemovePeer, ratePerMin)
sc.DefaultServingStoreLimit.SetDefaultStoreLimit(storelimit.RemovePeer, ratePerMin)
for storeID := range v.StoreLimit {
sc := sc.StoreLimitConfig{AddPeer: v.StoreLimit[storeID].AddPeer, RemovePeer: ratePerMin}
v.StoreLimit[storeID] = sc
Expand Down Expand Up @@ -445,8 +445,8 @@ func (o *PersistOptions) GetStoreLimit(storeID uint64) (returnSC sc.StoreLimitCo
}
cfg := o.GetScheduleConfig().Clone()
sc := sc.StoreLimitConfig{
AddPeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
AddPeer: sc.DefaultServingStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: sc.DefaultServingStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
}
v, ok1, err := o.getTTLFloat("default-add-peer")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion tools/pd-simulator/simulator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type Coprocessor struct {

// NewSimConfig create a new configuration of the simulator.
func NewSimConfig(serverLogLevel string) *SimConfig {
sc.DefaultStoreLimit = sc.StoreLimit{AddPeer: 2000, RemovePeer: 2000}
sc.DefaultServingStoreLimit = sc.StoreLimit{AddPeer: 2000, RemovePeer: 2000}
cfg := &config.Config{
Name: "pd",
ClientUrls: tempurl.Alloc(),
Expand Down

0 comments on commit 61d9710

Please sign in to comment.