Skip to content

Commit

Permalink
disable PD scheduling when enabling scheduling service
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Sep 4, 2023
1 parent 91648e5 commit b1ce492
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 85 deletions.
2 changes: 2 additions & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/apiutil"
Expand Down Expand Up @@ -438,6 +439,7 @@ func CreateServer(ctx context.Context, cfg *config.Config) *Server {

// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server
func CreateServerWrapper(cmd *cobra.Command, args []string) {
schedulers.Register()
cmd.Flags().Parse(args)
cfg := config.NewConfig()
flagSet := cmd.Flags()
Expand Down
11 changes: 7 additions & 4 deletions pkg/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func newStoreStatistics(opt *config.PersistOptions) *storeStatistics {
}
}

func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) {
func (s *storeStatistics) Observe(store *core.StoreInfo, stats ...*StoresStats) {
for _, k := range s.opt.GetLocationLabels() {
v := store.GetLabelValue(k)
if v == "" {
Expand Down Expand Up @@ -147,8 +147,11 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) {
storeStatusGauge.WithLabelValues(storeAddress, id, "store_slow_trend_result_rate").Set(slowTrend.ResultRate)
}

if len(stats) == 0 {
return
}
// Store flows.
storeFlowStats := stats.GetRollingStoreStats(store.GetID())
storeFlowStats := stats[0].GetRollingStoreStats(store.GetID())
if storeFlowStats == nil {
return
}
Expand Down Expand Up @@ -298,8 +301,8 @@ func NewStoreStatisticsMap(opt *config.PersistOptions) *storeStatisticsMap {
}
}

func (m *storeStatisticsMap) Observe(store *core.StoreInfo, stats *StoresStats) {
m.stats.Observe(store, stats)
func (m *storeStatisticsMap) Observe(store *core.StoreInfo, stats ...*StoresStats) {
m.stats.Observe(store, stats...)
}

func (m *storeStatisticsMap) Collect() {
Expand Down
198 changes: 118 additions & 80 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,12 @@ type RaftCluster struct {
etcdClient *clientv3.Client
httpClient *http.Client

running bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS uint64
externalTS uint64
running bool
isAPIServiceMode bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS uint64
externalTS uint64

// Keep the previous store limit settings when removing a store.
prevStoreLimit map[uint64]map[storelimit.Type]float64
Expand Down Expand Up @@ -287,6 +288,7 @@ func (c *RaftCluster) Start(s Server) error {
return nil
}

c.isAPIServiceMode = s.IsAPIServiceMode()
c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster(), s.GetKeyspaceGroupManager())
cluster, err := c.LoadClusterInfo()
if err != nil {
Expand All @@ -312,6 +314,7 @@ func (c *RaftCluster) Start(s Server) error {
if err != nil {
return err
}

c.coordinator = schedule.NewCoordinator(c.ctx, cluster, s.GetHBStreams())
c.regionStats = statistics.NewRegionStatistics(c.core, c.opt, c.ruleManager)
c.limiter = NewStoreLimiter(s.GetPersistOptions())
Expand All @@ -329,11 +332,14 @@ func (c *RaftCluster) Start(s Server) error {
}
}

c.wg.Add(10)
go c.runCoordinator()
go c.runMetricsCollectionJob()
if !s.IsAPIServiceMode() {
c.wg.Add(3)
go c.runCoordinator()
go c.runStatsBackgroundJobs()
go c.runMetricsCollectionJob()
}
c.wg.Add(7)
go c.runNodeStateCheckJob()
go c.runStatsBackgroundJobs()
go c.syncRegions()
go c.runReplicationMode()
go c.runMinResolvedTSJob()
Expand Down Expand Up @@ -590,8 +596,10 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
)
for _, store := range c.GetStores() {
storeID := store.GetID()
c.hotStat.GetOrCreateRollingStoreStats(storeID)
c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow())
if !c.isAPIServiceMode {
c.hotStat.GetOrCreateRollingStoreStats(storeID)
c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow())
}
}
return c, nil
}
Expand Down Expand Up @@ -713,7 +721,9 @@ func (c *RaftCluster) Stop() {
return
}
c.running = false
c.coordinator.Stop()
if !c.isAPIServiceMode {
c.coordinator.Stop()
}
c.cancel()
c.Unlock()

Expand Down Expand Up @@ -934,11 +944,15 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest
nowTime := time.Now()
var newStore *core.StoreInfo
// If this cluster has slow stores, we should awaken hibernated regions in other stores.
if needAwaken, slowStoreIDs := c.NeedAwakenAllRegionsInStore(storeID); needAwaken {
log.Info("forcely awaken hibernated regions", zap.Uint64("store-id", storeID), zap.Uint64s("slow-stores", slowStoreIDs))
newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), core.SetLastAwakenTime(nowTime), opt)
resp.AwakenRegions = &pdpb.AwakenRegions{
AbnormalStores: slowStoreIDs,
if !c.isAPIServiceMode {
if needAwaken, slowStoreIDs := c.NeedAwakenAllRegionsInStore(storeID); needAwaken {
log.Info("forcely awaken hibernated regions", zap.Uint64("store-id", storeID), zap.Uint64s("slow-stores", slowStoreIDs))
newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), core.SetLastAwakenTime(nowTime), opt)
resp.AwakenRegions = &pdpb.AwakenRegions{
AbnormalStores: slowStoreIDs,
}
} else {
newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), opt)
}
} else {
newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), opt)
Expand All @@ -961,41 +975,47 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest
statistics.UpdateStoreHeartbeatMetrics(store)
}
c.core.PutStore(newStore)
c.hotStat.Observe(storeID, newStore.GetStoreStats())
c.hotStat.FilterUnhealthyStore(c)
c.slowStat.ObserveSlowStoreStatus(storeID, newStore.IsSlow())
reportInterval := stats.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()

regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats()))
for _, peerStat := range stats.GetPeerStats() {
regionID := peerStat.GetRegionId()
region := c.GetRegion(regionID)
regions[regionID] = region
if region == nil {
log.Warn("discard hot peer stat for unknown region",
zap.Uint64("region-id", regionID),
zap.Uint64("store-id", storeID))
continue
}
peer := region.GetStorePeer(storeID)
if peer == nil {
log.Warn("discard hot peer stat for unknown region peer",
zap.Uint64("region-id", regionID),
zap.Uint64("store-id", storeID))
continue
}
readQueryNum := core.GetReadQueryNum(peerStat.GetQueryStats())
loads := []float64{
utils.RegionReadBytes: float64(peerStat.GetReadBytes()),
utils.RegionReadKeys: float64(peerStat.GetReadKeys()),
utils.RegionReadQueryNum: float64(readQueryNum),
utils.RegionWriteBytes: 0,
utils.RegionWriteKeys: 0,
utils.RegionWriteQueryNum: 0,
var (
regions map[uint64]*core.RegionInfo
interval uint64
)
if !c.isAPIServiceMode {
c.hotStat.Observe(storeID, newStore.GetStoreStats())
c.hotStat.FilterUnhealthyStore(c)
c.slowStat.ObserveSlowStoreStatus(storeID, newStore.IsSlow())
reportInterval := stats.GetInterval()
interval = reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()

regions = make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats()))
for _, peerStat := range stats.GetPeerStats() {
regionID := peerStat.GetRegionId()
region := c.GetRegion(regionID)
regions[regionID] = region
if region == nil {
log.Warn("discard hot peer stat for unknown region",
zap.Uint64("region-id", regionID),
zap.Uint64("store-id", storeID))
continue
}
peer := region.GetStorePeer(storeID)
if peer == nil {
log.Warn("discard hot peer stat for unknown region peer",
zap.Uint64("region-id", regionID),
zap.Uint64("store-id", storeID))
continue
}
readQueryNum := core.GetReadQueryNum(peerStat.GetQueryStats())
loads := []float64{
utils.RegionReadBytes: float64(peerStat.GetReadBytes()),
utils.RegionReadKeys: float64(peerStat.GetReadKeys()),
utils.RegionReadQueryNum: float64(readQueryNum),
utils.RegionWriteBytes: 0,
utils.RegionWriteKeys: 0,
utils.RegionWriteQueryNum: 0,
}
peerInfo := core.NewPeerInfo(peer, loads, interval)
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
peerInfo := core.NewPeerInfo(peer, loads, interval)
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
for _, stat := range stats.GetSnapshotStats() {
// the duration of snapshot is the sum between to send and generate snapshot.
Expand All @@ -1015,8 +1035,10 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest
e := int64(dur)*2 - int64(stat.GetTotalDurationSec())
store.Feedback(float64(e))
}
// Here we will compare the reported regions with the previous hot peers to decide if it is still hot.
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval))
if !c.isAPIServiceMode {
// Here we will compare the reported regions with the previous hot peers to decide if it is still hot.
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval))
}
return nil
}

Expand Down Expand Up @@ -1064,22 +1086,24 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket())

c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval)
c.hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region))
if !c.isAPIServiceMode {
c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval)
c.hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
c.coordinator.GetSchedulersController().CheckTransferWitnessLeader(region)
}
c.coordinator.GetSchedulersController().CheckTransferWitnessLeader(region)

hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, saveKV, saveCache, needSync := regionGuide(region, origin)
if !saveKV && !saveCache && !isNew {
if !c.isAPIServiceMode && !saveKV && !saveCache && !isNew {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -1106,23 +1130,25 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}

for _, item := range overlaps {
if c.regionStats != nil {
c.regionStats.ClearDefunctRegion(item.GetID())
if !c.isAPIServiceMode {
if c.regionStats != nil {
c.regionStats.ClearDefunctRegion(item.GetID())
}
c.labelLevelStats.ClearDefunctRegion(item.GetID())
}
c.labelLevelStats.ClearDefunctRegion(item.GetID())
c.ruleManager.InvalidCache(item.GetID())
}
regionUpdateCacheEventCounter.Inc()
}

if hasRegionStats {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
if !c.isAPIServiceMode {
if hasRegionStats {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}
}

if !c.IsPrepared() && isNew {
c.coordinator.GetPrepareChecker().Collect(region)
}

if c.storage != nil {
// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
Expand Down Expand Up @@ -1598,8 +1624,10 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error {
delete(c.prevStoreLimit, storeID)
c.RemoveStoreLimit(storeID)
c.resetProgress(storeID, store.GetAddress())
c.hotStat.RemoveRollingStoreStats(storeID)
c.slowStat.RemoveSlowStoreStatus(storeID)
if !c.isAPIServiceMode {
c.hotStat.RemoveRollingStoreStats(storeID)
c.slowStat.RemoveSlowStoreStatus(storeID)
}
}
return err
}
Expand Down Expand Up @@ -1773,8 +1801,10 @@ func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error {
}
}
c.core.PutStore(store)
c.hotStat.GetOrCreateRollingStoreStats(store.GetID())
c.slowStat.ObserveSlowStoreStatus(store.GetID(), store.IsSlow())
if !c.isAPIServiceMode {
c.hotStat.GetOrCreateRollingStoreStats(store.GetID())
c.slowStat.ObserveSlowStoreStatus(store.GetID(), store.IsSlow())
}
return nil
}

Expand Down Expand Up @@ -2124,23 +2154,31 @@ func (c *RaftCluster) collectMetrics() {
statsMap := statistics.NewStoreStatisticsMap(c.opt)
stores := c.GetStores()
for _, s := range stores {
statsMap.Observe(s, c.hotStat.StoresStats)
if !c.isAPIServiceMode {
statsMap.Observe(s, c.hotStat.StoresStats)
} else {
statsMap.Observe(s)
}
}
statsMap.Collect()

c.coordinator.GetSchedulersController().CollectSchedulerMetrics()
c.coordinator.CollectHotSpotMetrics()
c.collectClusterMetrics()
if !c.isAPIServiceMode {
c.coordinator.GetSchedulersController().CollectSchedulerMetrics()
c.coordinator.CollectHotSpotMetrics()
c.collectClusterMetrics()
}
c.collectHealthStatus()
}

func (c *RaftCluster) resetMetrics() {
statsMap := statistics.NewStoreStatisticsMap(c.opt)
statsMap.Reset()

c.coordinator.GetSchedulersController().ResetSchedulerMetrics()
c.coordinator.ResetHotSpotMetrics()
c.resetClusterMetrics()
if !c.isAPIServiceMode {
c.coordinator.GetSchedulersController().ResetSchedulerMetrics()
c.coordinator.ResetHotSpotMetrics()
c.resetClusterMetrics()
}
c.resetHealthStatus()
c.resetProgressIndicator()
}
Expand Down
4 changes: 3 additions & 1 deletion server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ func (c *RaftCluster) HandleReportBuckets(b *metapb.Buckets) error {
if err := c.processReportBuckets(b); err != nil {
return err
}
c.hotStat.CheckAsync(buckets.NewCheckPeerTask(b))
if !c.isAPIServiceMode {
c.hotStat.CheckAsync(buckets.NewCheckPeerTask(b))
}
return nil
}

0 comments on commit b1ce492

Please sign in to comment.