Skip to content

Commit

Permalink
Merge branch 'master' into pd_tiflash_ru_metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Sep 21, 2023
2 parents a2cd62c + a997316 commit 8622eea
Show file tree
Hide file tree
Showing 18 changed files with 331 additions and 126 deletions.
40 changes: 23 additions & 17 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,9 +682,14 @@ func (r *RegionInfo) isRegionRecreated() bool {
return r.GetRegionEpoch().GetVersion() == 1 && r.GetRegionEpoch().GetConfVer() == 1 && (len(r.GetStartKey()) != 0 || len(r.GetEndKey()) != 0)
}

// RegionChanged is a struct that records the changes of the region.
type RegionChanged struct {
IsNew, SaveKV, SaveCache, NeedSync bool
}

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool)
type RegionGuideFunc func(region, origin *RegionInfo) *RegionChanged

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -697,18 +702,19 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) {
// Mark IsNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (changed *RegionChanged) {
changed = &RegionChanged{}
if origin == nil {
if log.GetLevel() <= zap.DebugLevel {
debug("insert new region",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
}
saveKV, saveCache, isNew = true, true, true
changed.SaveKV, changed.SaveCache, changed.IsNew = true, true, true
} else {
if !origin.IsFromHeartbeat() {
isNew = true
changed.IsNew = true
}
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
Expand All @@ -721,7 +727,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-version", r.GetVersion()),
)
}
saveKV, saveCache = true, true
changed.SaveKV, changed.SaveCache = true, true
}
if r.GetConfVer() > o.GetConfVer() {
if log.GetLevel() <= zap.InfoLevel {
Expand All @@ -732,11 +738,11 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-confver", r.GetConfVer()),
)
}
saveKV, saveCache = true, true
changed.SaveCache, changed.SaveKV = true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() == 0 {
isNew = true
changed.IsNew = true
} else if log.GetLevel() <= zap.InfoLevel {
info("leader changed",
zap.Uint64("region-id", region.GetID()),
Expand All @@ -745,57 +751,57 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
)
}
// We check it first and do not return because the log is important for us to investigate,
saveCache, needSync = true, true
changed.SaveCache, changed.NeedSync = true, true
}
if len(region.GetPeers()) != len(origin.GetPeers()) {
saveKV, saveCache = true, true
changed.SaveCache, changed.SaveKV = true, true
return
}
if len(region.GetBuckets().GetKeys()) != len(origin.GetBuckets().GetKeys()) {
if log.GetLevel() <= zap.DebugLevel {
debug("bucket key changed", zap.Uint64("region-id", region.GetID()))
}
saveKV, saveCache = true, true
changed.SaveCache, changed.SaveKV = true, true
return
}
// Once flow has changed, will update the cache.
// Because keys and bytes are strongly related, only bytes are judged.
if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
region.GetRoundBytesRead() != origin.GetRoundBytesRead() ||
region.flowRoundDivisor < origin.flowRoundDivisor {
saveCache, needSync = true, true
changed.SaveCache, changed.NeedSync = true, true
return
}
if !SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) {
if log.GetLevel() <= zap.DebugLevel {
debug("down-peers changed", zap.Uint64("region-id", region.GetID()))
}
saveCache, needSync = true, true
changed.SaveCache, changed.NeedSync = true, true
return
}
if !SortedPeersEqual(region.GetPendingPeers(), origin.GetPendingPeers()) {
if log.GetLevel() <= zap.DebugLevel {
debug("pending-peers changed", zap.Uint64("region-id", region.GetID()))
}
saveCache, needSync = true, true
changed.SaveCache, changed.NeedSync = true, true
return
}
if region.GetApproximateSize() != origin.GetApproximateSize() ||
region.GetApproximateKeys() != origin.GetApproximateKeys() {
saveCache = true
changed.SaveCache = true
return
}
if region.GetReplicationStatus().GetState() != replication_modepb.RegionReplicationState_UNKNOWN &&
(region.GetReplicationStatus().GetState() != origin.GetReplicationStatus().GetState() ||
region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) {
saveCache = true
changed.SaveCache = true
return
}
// Do not save to kv, because 1) flashback will be eventually set to
// false, 2) flashback changes almost all regions in a cluster.
// Saving kv may downgrade PD performance when there are many regions.
if region.IsFlashbackChanged(origin) {
saveCache = true
changed.SaveCache = true
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
_, _, _, needSync := RegionGuide(regionA, regionB)
re.Equal(testCase.needSync, needSync)
changed := RegionGuide(regionA, regionB)
re.Equal(testCase.needSync, changed.NeedSync)
}
}

Expand Down
39 changes: 23 additions & 16 deletions pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
initialMinSpace = 8 * units.GiB // 2^33=8GB
slowStoreThreshold = 80
awakenStoreInterval = 10 * time.Minute // 2 * slowScoreRecoveryTime
splitStoreWait = time.Minute

// EngineKey is the label key used to indicate engine.
EngineKey = "engine"
Expand All @@ -50,22 +51,23 @@ const (
type StoreInfo struct {
meta *metapb.Store
*storeStats
pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
pendingPeerCount int
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
limiter storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
pendingPeerCount int
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
limiter storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
recentlySplitRegionsTime time.Time
}

// NewStoreInfo creates StoreInfo with meta data.
Expand Down Expand Up @@ -539,6 +541,11 @@ func (s *StoreInfo) NeedAwakenStore() bool {
return s.GetLastHeartbeatTS().Sub(s.lastAwakenTime) > awakenStoreInterval
}

// HasRecentlySplitRegions checks if there are some region are splitted in this store.
func (s *StoreInfo) HasRecentlySplitRegions() bool {
return time.Since(s.recentlySplitRegionsTime) < splitStoreWait
}

var (
// If a store's last heartbeat is storeDisconnectDuration ago, the store will
// be marked as disconnected state. The value should be greater than tikv's
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,10 @@ func SetLastAwakenTime(lastAwaken time.Time) StoreCreateOption {
store.lastAwakenTime = lastAwaken
}
}

// SetRecentlySplitRegionsTime sets last split time for the store.
func SetRecentlySplitRegionsTime(recentlySplitRegionsTime time.Time) StoreCreateOption {
return func(store *StoreInfo) {
store.recentlySplitRegionsTime = recentlySplitRegionsTime
}
}
8 changes: 4 additions & 4 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,8 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin)
if !saveCache && !isNew {
changed := core.GenerateRegionGuideFunc(true)(region, origin)
if !changed.SaveCache && !changed.IsNew {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -444,7 +444,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
}

var overlaps []*core.RegionInfo
if saveCache {
if changed.SaveCache {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
Expand All @@ -456,7 +456,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
cluster.HandleOverlaps(c, overlaps)
}

cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared())
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, changed.IsNew, c.IsPrepared())
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/schedule/filter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ const (
storeStateTooManyPendingPeer
storeStateRejectLeader
storeStateSlowTrend
storeStateRecentlySplitRegions

filtersLen
)
Expand Down Expand Up @@ -156,6 +157,7 @@ var filters = [filtersLen]string{
"store-state-too-many-pending-peers-filter",
"store-state-reject-leader-filter",
"store-state-slow-trend-filter",
"store-state-recently-split-regions-filter",
}

// String implements fmt.Stringer interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/filter/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestString(t *testing.T) {
expected string
}{
{int(storeStateTombstone), "store-state-tombstone-filter"},
{int(filtersLen - 1), "store-state-slow-trend-filter"},
{int(filtersLen - 1), "store-state-recently-split-regions-filter"},
{int(filtersLen), "unknown"},
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ type StoreStateFilter struct {
// If it checks failed, the operator will be put back to the waiting queue util the limit is available.
// But the scheduler should keep the same with the operator level.
OperatorLevel constant.PriorityLevel
// check the store not split recently in it if set true.
ForbidRecentlySplitRegions bool
// Reason is used to distinguish the reason of store state filter
Reason filterType
}
Expand Down Expand Up @@ -471,6 +473,15 @@ func (f *StoreStateFilter) hasRejectLeaderProperty(conf config.SharedConfigProvi
return statusOK
}

func (f *StoreStateFilter) hasRecentlySplitRegions(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status {
if f.ForbidRecentlySplitRegions && store.HasRecentlySplitRegions() {
f.Reason = storeStateRecentlySplitRegions
return statusStoreRecentlySplitRegions
}
f.Reason = storeStateOK
return statusOK
}

// The condition table.
// Y: the condition is temporary (expected to become false soon).
// N: the condition is expected to be true for a long time.
Expand Down Expand Up @@ -499,7 +510,7 @@ func (f *StoreStateFilter) anyConditionMatch(typ int, conf config.SharedConfigPr
var funcs []conditionFunc
switch typ {
case leaderSource:
funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransfer, f.isDisconnected}
funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransfer, f.isDisconnected, f.hasRecentlySplitRegions}
case regionSource:
funcs = []conditionFunc{f.isBusy, f.exceedRemoveLimit, f.tooManySnapshots}
case witnessSource:
Expand Down
45 changes: 26 additions & 19 deletions pkg/schedule/filter/region_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,6 @@ import (
"github.com/tikv/pd/pkg/slice"
)

// SelectRegions selects regions that be selected from the list.
func SelectRegions(regions []*core.RegionInfo, filters ...RegionFilter) []*core.RegionInfo {
return filterRegionsBy(regions, func(r *core.RegionInfo) bool {
return slice.AllOf(filters, func(i int) bool {
return filters[i].Select(r).IsOK()
})
})
}

func filterRegionsBy(regions []*core.RegionInfo, keepPred func(*core.RegionInfo) bool) (selected []*core.RegionInfo) {
for _, s := range regions {
if keepPred(s) {
selected = append(selected, s)
}
}
return
}

// SelectOneRegion selects one region that be selected from the list.
func SelectOneRegion(regions []*core.RegionInfo, collector *plan.Collector, filters ...RegionFilter) *core.RegionInfo {
for _, r := range regions {
Expand Down Expand Up @@ -173,7 +155,7 @@ type SnapshotSenderFilter struct {
senders map[uint64]struct{}
}

// NewSnapshotSendFilter returns creates a RegionFilter that filters regions with witness peer on the specific store.
// NewSnapshotSendFilter returns creates a RegionFilter that filters regions whose leader has sender limit on the specific store.
// level should be set as same with the operator priority level.
func NewSnapshotSendFilter(stores []*core.StoreInfo, level constant.PriorityLevel) RegionFilter {
senders := make(map[uint64]struct{})
Expand All @@ -193,3 +175,28 @@ func (f *SnapshotSenderFilter) Select(region *core.RegionInfo) *plan.Status {
}
return statusRegionLeaderSendSnapshotThrottled
}

// StoreRecentlySplitFilter filer the region whose leader store not recently split regions.
type StoreRecentlySplitFilter struct {
recentlySplitStores map[uint64]struct{}
}

// NewStoreRecentlySplitFilter returns creates a StoreRecentlySplitFilter.
func NewStoreRecentlySplitFilter(stores []*core.StoreInfo) RegionFilter {
recentlySplitStores := make(map[uint64]struct{})
for _, store := range stores {
if store.HasRecentlySplitRegions() {
recentlySplitStores[store.GetID()] = struct{}{}
}
}
return &StoreRecentlySplitFilter{recentlySplitStores: recentlySplitStores}
}

// Select returns ok if the region leader not in the recentlySplitStores.
func (f *StoreRecentlySplitFilter) Select(region *core.RegionInfo) *plan.Status {
leaderStoreID := region.GetLeader().GetStoreId()
if _, ok := f.recentlySplitStores[leaderStoreID]; ok {
return statusStoreRecentlySplitRegions
}
return statusOK
}
5 changes: 3 additions & 2 deletions pkg/schedule/filter/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ var (
// store config limitation
statusStoreRejectLeader = plan.NewStatus(plan.StatusStoreRejectLeader)

statusStoreNotMatchRule = plan.NewStatus(plan.StatusStoreNotMatchRule)
statusStoreNotMatchIsolation = plan.NewStatus(plan.StatusStoreNotMatchIsolation)
statusStoreNotMatchRule = plan.NewStatus(plan.StatusStoreNotMatchRule)
statusStoreNotMatchIsolation = plan.NewStatus(plan.StatusStoreNotMatchIsolation)
statusStoreRecentlySplitRegions = plan.NewStatus(plan.StatusStoreRecentlySplitRegions)

// region filter status
statusRegionPendingPeer = plan.NewStatus(plan.StatusRegionUnhealthy)
Expand Down
5 changes: 4 additions & 1 deletion pkg/schedule/plan/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ const (
StatusStoreLowSpace = iota + 500
// StatusStoreNotExisted represents the store cannot be found in PD.
StatusStoreNotExisted
// StatusStoreRecentlySplitRegions represents the store cannot be selected due to the region is splitting.
StatusStoreRecentlySplitRegions
)

// TODO: define region status priority
Expand Down Expand Up @@ -127,7 +129,8 @@ var statusText = map[StatusCode]string{
StatusStoreDown: "StoreDown",
StatusStoreBusy: "StoreBusy",

StatusStoreNotExisted: "StoreNotExisted",
StatusStoreNotExisted: "StoreNotExisted",
StatusStoreRecentlySplitRegions: "StoreRecentlySplitRegions",

// region
StatusRegionHot: "RegionHot",
Expand Down
Loading

0 comments on commit 8622eea

Please sign in to comment.