From b1ce49282df7a5d5e617cbbaf8bdc41a2741ace3 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 4 Sep 2023 16:13:28 +0800 Subject: [PATCH] disable PD scheduling when enabling scheduling service Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/server.go | 2 + pkg/statistics/store_collection.go | 11 +- server/cluster/cluster.go | 198 +++++++++++++++++----------- server/cluster/cluster_worker.go | 4 +- 4 files changed, 130 insertions(+), 85 deletions(-) diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 0b2ad56a0e40..a4a71bbef7a3 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -45,6 +45,7 @@ import ( "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/hbstream" + "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/apiutil" @@ -438,6 +439,7 @@ func CreateServer(ctx context.Context, cfg *config.Config) *Server { // CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server func CreateServerWrapper(cmd *cobra.Command, args []string) { + schedulers.Register() cmd.Flags().Parse(args) cfg := config.NewConfig() flagSet := cmd.Flags() diff --git a/pkg/statistics/store_collection.go b/pkg/statistics/store_collection.go index 2b695ed29230..9a5570e3cf54 100644 --- a/pkg/statistics/store_collection.go +++ b/pkg/statistics/store_collection.go @@ -61,7 +61,7 @@ func newStoreStatistics(opt *config.PersistOptions) *storeStatistics { } } -func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) { +func (s *storeStatistics) Observe(store *core.StoreInfo, stats ...*StoresStats) { for _, k := range s.opt.GetLocationLabels() { v := store.GetLabelValue(k) if v == "" { @@ -147,8 +147,11 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) { storeStatusGauge.WithLabelValues(storeAddress, id, "store_slow_trend_result_rate").Set(slowTrend.ResultRate) } + if len(stats) == 0 { + return + } // Store flows. - storeFlowStats := stats.GetRollingStoreStats(store.GetID()) + storeFlowStats := stats[0].GetRollingStoreStats(store.GetID()) if storeFlowStats == nil { return } @@ -298,8 +301,8 @@ func NewStoreStatisticsMap(opt *config.PersistOptions) *storeStatisticsMap { } } -func (m *storeStatisticsMap) Observe(store *core.StoreInfo, stats *StoresStats) { - m.stats.Observe(store, stats) +func (m *storeStatisticsMap) Observe(store *core.StoreInfo, stats ...*StoresStats) { + m.stats.Observe(store, stats...) } func (m *storeStatisticsMap) Collect() { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 36bec2fc1c16..fcb718319185 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -143,11 +143,12 @@ type RaftCluster struct { etcdClient *clientv3.Client httpClient *http.Client - running bool - meta *metapb.Cluster - storage storage.Storage - minResolvedTS uint64 - externalTS uint64 + running bool + isAPIServiceMode bool + meta *metapb.Cluster + storage storage.Storage + minResolvedTS uint64 + externalTS uint64 // Keep the previous store limit settings when removing a store. prevStoreLimit map[uint64]map[storelimit.Type]float64 @@ -287,6 +288,7 @@ func (c *RaftCluster) Start(s Server) error { return nil } + c.isAPIServiceMode = s.IsAPIServiceMode() c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster(), s.GetKeyspaceGroupManager()) cluster, err := c.LoadClusterInfo() if err != nil { @@ -312,6 +314,7 @@ func (c *RaftCluster) Start(s Server) error { if err != nil { return err } + c.coordinator = schedule.NewCoordinator(c.ctx, cluster, s.GetHBStreams()) c.regionStats = statistics.NewRegionStatistics(c.core, c.opt, c.ruleManager) c.limiter = NewStoreLimiter(s.GetPersistOptions()) @@ -329,11 +332,14 @@ func (c *RaftCluster) Start(s Server) error { } } - c.wg.Add(10) - go c.runCoordinator() - go c.runMetricsCollectionJob() + if !s.IsAPIServiceMode() { + c.wg.Add(3) + go c.runCoordinator() + go c.runStatsBackgroundJobs() + go c.runMetricsCollectionJob() + } + c.wg.Add(7) go c.runNodeStateCheckJob() - go c.runStatsBackgroundJobs() go c.syncRegions() go c.runReplicationMode() go c.runMinResolvedTSJob() @@ -590,8 +596,10 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { ) for _, store := range c.GetStores() { storeID := store.GetID() - c.hotStat.GetOrCreateRollingStoreStats(storeID) - c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow()) + if !c.isAPIServiceMode { + c.hotStat.GetOrCreateRollingStoreStats(storeID) + c.slowStat.ObserveSlowStoreStatus(storeID, store.IsSlow()) + } } return c, nil } @@ -713,7 +721,9 @@ func (c *RaftCluster) Stop() { return } c.running = false - c.coordinator.Stop() + if !c.isAPIServiceMode { + c.coordinator.Stop() + } c.cancel() c.Unlock() @@ -934,11 +944,15 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest nowTime := time.Now() var newStore *core.StoreInfo // If this cluster has slow stores, we should awaken hibernated regions in other stores. - if needAwaken, slowStoreIDs := c.NeedAwakenAllRegionsInStore(storeID); needAwaken { - log.Info("forcely awaken hibernated regions", zap.Uint64("store-id", storeID), zap.Uint64s("slow-stores", slowStoreIDs)) - newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), core.SetLastAwakenTime(nowTime), opt) - resp.AwakenRegions = &pdpb.AwakenRegions{ - AbnormalStores: slowStoreIDs, + if !c.isAPIServiceMode { + if needAwaken, slowStoreIDs := c.NeedAwakenAllRegionsInStore(storeID); needAwaken { + log.Info("forcely awaken hibernated regions", zap.Uint64("store-id", storeID), zap.Uint64s("slow-stores", slowStoreIDs)) + newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), core.SetLastAwakenTime(nowTime), opt) + resp.AwakenRegions = &pdpb.AwakenRegions{ + AbnormalStores: slowStoreIDs, + } + } else { + newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), opt) } } else { newStore = store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime), opt) @@ -961,41 +975,47 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest statistics.UpdateStoreHeartbeatMetrics(store) } c.core.PutStore(newStore) - c.hotStat.Observe(storeID, newStore.GetStoreStats()) - c.hotStat.FilterUnhealthyStore(c) - c.slowStat.ObserveSlowStoreStatus(storeID, newStore.IsSlow()) - reportInterval := stats.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - - regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats())) - for _, peerStat := range stats.GetPeerStats() { - regionID := peerStat.GetRegionId() - region := c.GetRegion(regionID) - regions[regionID] = region - if region == nil { - log.Warn("discard hot peer stat for unknown region", - zap.Uint64("region-id", regionID), - zap.Uint64("store-id", storeID)) - continue - } - peer := region.GetStorePeer(storeID) - if peer == nil { - log.Warn("discard hot peer stat for unknown region peer", - zap.Uint64("region-id", regionID), - zap.Uint64("store-id", storeID)) - continue - } - readQueryNum := core.GetReadQueryNum(peerStat.GetQueryStats()) - loads := []float64{ - utils.RegionReadBytes: float64(peerStat.GetReadBytes()), - utils.RegionReadKeys: float64(peerStat.GetReadKeys()), - utils.RegionReadQueryNum: float64(readQueryNum), - utils.RegionWriteBytes: 0, - utils.RegionWriteKeys: 0, - utils.RegionWriteQueryNum: 0, + var ( + regions map[uint64]*core.RegionInfo + interval uint64 + ) + if !c.isAPIServiceMode { + c.hotStat.Observe(storeID, newStore.GetStoreStats()) + c.hotStat.FilterUnhealthyStore(c) + c.slowStat.ObserveSlowStoreStatus(storeID, newStore.IsSlow()) + reportInterval := stats.GetInterval() + interval = reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + + regions = make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats())) + for _, peerStat := range stats.GetPeerStats() { + regionID := peerStat.GetRegionId() + region := c.GetRegion(regionID) + regions[regionID] = region + if region == nil { + log.Warn("discard hot peer stat for unknown region", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + peer := region.GetStorePeer(storeID) + if peer == nil { + log.Warn("discard hot peer stat for unknown region peer", + zap.Uint64("region-id", regionID), + zap.Uint64("store-id", storeID)) + continue + } + readQueryNum := core.GetReadQueryNum(peerStat.GetQueryStats()) + loads := []float64{ + utils.RegionReadBytes: float64(peerStat.GetReadBytes()), + utils.RegionReadKeys: float64(peerStat.GetReadKeys()), + utils.RegionReadQueryNum: float64(readQueryNum), + utils.RegionWriteBytes: 0, + utils.RegionWriteKeys: 0, + utils.RegionWriteQueryNum: 0, + } + peerInfo := core.NewPeerInfo(peer, loads, interval) + c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) } - peerInfo := core.NewPeerInfo(peer, loads, interval) - c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) } for _, stat := range stats.GetSnapshotStats() { // the duration of snapshot is the sum between to send and generate snapshot. @@ -1015,8 +1035,10 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest e := int64(dur)*2 - int64(stat.GetTotalDurationSec()) store.Feedback(float64(e)) } - // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. - c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) + if !c.isAPIServiceMode { + // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. + c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) + } return nil } @@ -1064,22 +1086,24 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) - c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) - c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) - c.hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) + if !c.isAPIServiceMode { + c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) + c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + for _, peer := range region.GetPeers() { + peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) + c.hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) + } + c.coordinator.GetSchedulersController().CheckTransferWitnessLeader(region) } - c.coordinator.GetSchedulersController().CheckTransferWitnessLeader(region) hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. // Mark isNew if the region in cache does not have leader. isNew, saveKV, saveCache, needSync := regionGuide(region, origin) - if !saveKV && !saveCache && !isNew { + if !c.isAPIServiceMode && !saveKV && !saveCache && !isNew { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { @@ -1106,23 +1130,25 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } for _, item := range overlaps { - if c.regionStats != nil { - c.regionStats.ClearDefunctRegion(item.GetID()) + if !c.isAPIServiceMode { + if c.regionStats != nil { + c.regionStats.ClearDefunctRegion(item.GetID()) + } + c.labelLevelStats.ClearDefunctRegion(item.GetID()) } - c.labelLevelStats.ClearDefunctRegion(item.GetID()) c.ruleManager.InvalidCache(item.GetID()) } regionUpdateCacheEventCounter.Inc() } - if hasRegionStats { - c.regionStats.Observe(region, c.getRegionStoresLocked(region)) + if !c.isAPIServiceMode { + if hasRegionStats { + c.regionStats.Observe(region, c.getRegionStoresLocked(region)) + } } - if !c.IsPrepared() && isNew { c.coordinator.GetPrepareChecker().Collect(region) } - if c.storage != nil { // If there are concurrent heartbeats from the same region, the last write will win even if // writes to storage in the critical area. So don't use mutex to protect it. @@ -1598,8 +1624,10 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error { delete(c.prevStoreLimit, storeID) c.RemoveStoreLimit(storeID) c.resetProgress(storeID, store.GetAddress()) - c.hotStat.RemoveRollingStoreStats(storeID) - c.slowStat.RemoveSlowStoreStatus(storeID) + if !c.isAPIServiceMode { + c.hotStat.RemoveRollingStoreStats(storeID) + c.slowStat.RemoveSlowStoreStatus(storeID) + } } return err } @@ -1773,8 +1801,10 @@ func (c *RaftCluster) putStoreLocked(store *core.StoreInfo) error { } } c.core.PutStore(store) - c.hotStat.GetOrCreateRollingStoreStats(store.GetID()) - c.slowStat.ObserveSlowStoreStatus(store.GetID(), store.IsSlow()) + if !c.isAPIServiceMode { + c.hotStat.GetOrCreateRollingStoreStats(store.GetID()) + c.slowStat.ObserveSlowStoreStatus(store.GetID(), store.IsSlow()) + } return nil } @@ -2124,13 +2154,19 @@ func (c *RaftCluster) collectMetrics() { statsMap := statistics.NewStoreStatisticsMap(c.opt) stores := c.GetStores() for _, s := range stores { - statsMap.Observe(s, c.hotStat.StoresStats) + if !c.isAPIServiceMode { + statsMap.Observe(s, c.hotStat.StoresStats) + } else { + statsMap.Observe(s) + } } statsMap.Collect() - c.coordinator.GetSchedulersController().CollectSchedulerMetrics() - c.coordinator.CollectHotSpotMetrics() - c.collectClusterMetrics() + if !c.isAPIServiceMode { + c.coordinator.GetSchedulersController().CollectSchedulerMetrics() + c.coordinator.CollectHotSpotMetrics() + c.collectClusterMetrics() + } c.collectHealthStatus() } @@ -2138,9 +2174,11 @@ func (c *RaftCluster) resetMetrics() { statsMap := statistics.NewStoreStatisticsMap(c.opt) statsMap.Reset() - c.coordinator.GetSchedulersController().ResetSchedulerMetrics() - c.coordinator.ResetHotSpotMetrics() - c.resetClusterMetrics() + if !c.isAPIServiceMode { + c.coordinator.GetSchedulersController().ResetSchedulerMetrics() + c.coordinator.ResetHotSpotMetrics() + c.resetClusterMetrics() + } c.resetHealthStatus() c.resetProgressIndicator() } diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 82113bc1656f..c1da97363b53 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -250,6 +250,8 @@ func (c *RaftCluster) HandleReportBuckets(b *metapb.Buckets) error { if err := c.processReportBuckets(b); err != nil { return err } - c.hotStat.CheckAsync(buckets.NewCheckPeerTask(b)) + if !c.isAPIServiceMode { + c.hotStat.CheckAsync(buckets.NewCheckPeerTask(b)) + } return nil }