diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go new file mode 100644 index 00000000000..0b3e0351b16 --- /dev/null +++ b/pkg/cluster/cluster.go @@ -0,0 +1,65 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/statistics" +) + +// Cluster provides an overview of a cluster's basic information. +type Cluster interface { + GetHotStat() *statistics.HotStat + GetRegionStats() *statistics.RegionStatistics + GetLabelStats() *statistics.LabelStatistics + GetCoordinator() *schedule.Coordinator + GetRuleManager() *placement.RuleManager +} + +// HandleStatsAsync handles the flow asynchronously. +func HandleStatsAsync(c Cluster, region *core.RegionInfo) { + c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) + c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + for _, peer := range region.GetPeers() { + peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) + c.GetHotStat().CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) + } + c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region) +} + +// HandleOverlaps handles the overlap regions. +func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) { + for _, item := range overlaps { + if c.GetRegionStats() != nil { + c.GetRegionStats().ClearDefunctRegion(item.GetID()) + } + c.GetLabelStats().ClearDefunctRegion(item.GetID()) + c.GetRuleManager().InvalidCache(item.GetID()) + } +} + +// Collect collects the cluster information. +func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats, isNew, isPrepared bool) { + if hasRegionStats { + c.GetRegionStats().Observe(region, stores) + } + if !isPrepared && isNew { + c.GetCoordinator().GetPrepareChecker().Collect(region) + } +} diff --git a/pkg/core/region.go b/pkg/core/region.go index 1531f722d25..4540f7aafb3 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/replication_modepb" - "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/logutil" @@ -144,59 +143,31 @@ const ( InitClusterRegionThreshold = 100 ) -// RegionFromHeartbeat constructs a Region from region heartbeat. -func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo { - // Convert unit to MB. - // If region isn't empty and less than 1MB, use 1MB instead. - regionSize := heartbeat.GetApproximateSize() / units.MiB - // Due to https://github.com/tikv/tikv/pull/11170, if region size is not initialized, - // approximate size will be zero, and region size is zero not EmptyRegionApproximateSize - if heartbeat.GetApproximateSize() > 0 && regionSize < EmptyRegionApproximateSize { - regionSize = EmptyRegionApproximateSize - } - regionKvSize := heartbeat.GetApproximateKvSize() / units.MiB - - region := &RegionInfo{ - term: heartbeat.GetTerm(), - meta: heartbeat.GetRegion(), - leader: heartbeat.GetLeader(), - downPeers: heartbeat.GetDownPeers(), - pendingPeers: heartbeat.GetPendingPeers(), - cpuUsage: heartbeat.GetCpuUsage(), - writtenBytes: heartbeat.GetBytesWritten(), - writtenKeys: heartbeat.GetKeysWritten(), - readBytes: heartbeat.GetBytesRead(), - readKeys: heartbeat.GetKeysRead(), - approximateSize: int64(regionSize), - approximateKvSize: int64(regionKvSize), - approximateKeys: int64(heartbeat.GetApproximateKeys()), - interval: heartbeat.GetInterval(), - replicationStatus: heartbeat.GetReplicationStatus(), - queryStats: heartbeat.GetQueryStats(), - } - - for _, opt := range opts { - opt(region) - } - - if region.writtenKeys >= ImpossibleFlowSize || region.writtenBytes >= ImpossibleFlowSize { - region.writtenKeys = 0 - region.writtenBytes = 0 - } - if region.readKeys >= ImpossibleFlowSize || region.readBytes >= ImpossibleFlowSize { - region.readKeys = 0 - region.readBytes = 0 - } - - sort.Sort(peerStatsSlice(region.downPeers)) - sort.Sort(peerSlice(region.pendingPeers)) - - classifyVoterAndLearner(region) - return region +// RegionHeartbeatResponse is the interface for region heartbeat response. +type RegionHeartbeatResponse interface { + GetTargetPeer() *metapb.Peer + GetRegionId() uint64 +} + +// RegionHeartbeatRequest is the interface for region heartbeat request. +type RegionHeartbeatRequest interface { + GetTerm() uint64 + GetRegion() *metapb.Region + GetLeader() *metapb.Peer + GetDownPeers() []*pdpb.PeerStats + GetPendingPeers() []*metapb.Peer + GetBytesWritten() uint64 + GetKeysWritten() uint64 + GetBytesRead() uint64 + GetKeysRead() uint64 + GetInterval() *pdpb.TimeInterval + GetQueryStats() *pdpb.QueryStats + GetApproximateSize() uint64 + GetApproximateKeys() uint64 } -// RegionFromForward constructs a Region from forwarded region heartbeat. -func RegionFromForward(heartbeat *schedulingpb.RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo { +// RegionFromHeartbeat constructs a Region from region heartbeat. +func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo { // Convert unit to MB. // If region isn't empty and less than 1MB, use 1MB instead. regionSize := heartbeat.GetApproximateSize() / units.MiB @@ -212,7 +183,6 @@ func RegionFromForward(heartbeat *schedulingpb.RegionHeartbeatRequest, opts ...R leader: heartbeat.GetLeader(), downPeers: heartbeat.GetDownPeers(), pendingPeers: heartbeat.GetPendingPeers(), - cpuUsage: heartbeat.GetCpuUsage(), writtenBytes: heartbeat.GetBytesWritten(), writtenKeys: heartbeat.GetKeysWritten(), readBytes: heartbeat.GetBytesRead(), @@ -223,6 +193,13 @@ func RegionFromForward(heartbeat *schedulingpb.RegionHeartbeatRequest, opts ...R queryStats: heartbeat.GetQueryStats(), } + // scheduling service doesn't need the following fields. + if h, ok := heartbeat.(*pdpb.RegionHeartbeatRequest); ok { + region.approximateKvSize = int64(h.GetApproximateKvSize() / units.MiB) + region.replicationStatus = h.GetReplicationStatus() + region.cpuUsage = h.GetCpuUsage() + } + for _, opt := range opts { opt(region) } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 98d552a4c18..0b9924f230b 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -10,6 +10,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cluster" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" @@ -39,7 +40,7 @@ type Cluster struct { ruleManager *placement.RuleManager labelerManager *labeler.RegionLabeler regionStats *statistics.RegionStatistics - labelLevelStats *statistics.LabelStatistics + labelStats *statistics.LabelStatistics hotStat *statistics.HotStat storage storage.Storage coordinator *schedule.Coordinator @@ -67,7 +68,7 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, labelerManager: labelerManager, persistConfig: persistConfig, hotStat: statistics.NewHotStat(ctx), - labelLevelStats: statistics.NewLabelStatistics(), + labelStats: statistics.NewLabelStatistics(), regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager), storage: storage, clusterID: clusterID, @@ -87,6 +88,21 @@ func (c *Cluster) GetCoordinator() *schedule.Coordinator { return c.coordinator } +// GetHotStat gets hot stat. +func (c *Cluster) GetHotStat() *statistics.HotStat { + return c.hotStat +} + +// GetRegionStats gets region statistics. +func (c *Cluster) GetRegionStats() *statistics.RegionStatistics { + return c.regionStats +} + +// GetLabelStats gets label statistics. +func (c *Cluster) GetLabelStats() *statistics.LabelStatistics { + return c.labelStats +} + // GetBasicCluster returns the basic cluster. func (c *Cluster) GetBasicCluster() *core.BasicCluster { return c.BasicCluster @@ -288,7 +304,7 @@ func (c *Cluster) waitSchedulersInitialized() { // UpdateRegionsLabelLevelStats updates the status of the region label level by types. func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { for _, region := range regions { - c.labelLevelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) + c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) } } @@ -411,15 +427,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { region.InheritBuckets(origin) } - c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) - c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) - c.hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) - } - c.coordinator.GetSchedulersController().CheckTransferWitnessLeader(region) + cluster.HandleStatsAsync(c, region) hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. @@ -445,23 +453,10 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { return err } - for _, item := range overlaps { - if c.regionStats != nil { - c.regionStats.ClearDefunctRegion(item.GetID()) - } - c.labelLevelStats.ClearDefunctRegion(item.GetID()) - c.ruleManager.InvalidCache(item.GetID()) - } - } - - if hasRegionStats { - c.regionStats.Observe(region, c.GetRegionStores(region)) - } - - if !c.IsPrepared() && isNew { - c.coordinator.GetPrepareChecker().Collect(region) + cluster.HandleOverlaps(c, overlaps) } + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) return nil } diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index b6c92424aa9..4558688822a 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -27,7 +27,6 @@ import ( bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mcs/registry" - "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" @@ -81,7 +80,7 @@ type heartbeatServer struct { closed int32 } -func (s *heartbeatServer) Send(m hbstream.RegionHeartbeatResponse) error { +func (s *heartbeatServer) Send(m core.RegionHeartbeatResponse) error { if atomic.LoadInt32(&s.closed) == 1 { return io.EOF } @@ -162,7 +161,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat s.hbStreams.BindStream(storeID, server) lastBind = time.Now() } - region := core.RegionFromForward(request, core.SetFromHeartbeat(true)) + region := core.RegionFromHeartbeat(request, core.SetFromHeartbeat(true)) err = c.HandleRegionHeartbeat(region) if err != nil { // TODO: if we need to send the error back to API server. diff --git a/pkg/mock/mockhbstream/mockhbstream.go b/pkg/mock/mockhbstream/mockhbstream.go index 672e3d81497..289f31d63dd 100644 --- a/pkg/mock/mockhbstream/mockhbstream.go +++ b/pkg/mock/mockhbstream/mockhbstream.go @@ -25,18 +25,18 @@ import ( // HeartbeatStream is used to mock HeartbeatStream for test use. type HeartbeatStream struct { - ch chan hbstream.RegionHeartbeatResponse + ch chan core.RegionHeartbeatResponse } // NewHeartbeatStream creates a new HeartbeatStream. func NewHeartbeatStream() HeartbeatStream { return HeartbeatStream{ - ch: make(chan hbstream.RegionHeartbeatResponse), + ch: make(chan core.RegionHeartbeatResponse), } } // Send mocks method. -func (s HeartbeatStream) Send(m hbstream.RegionHeartbeatResponse) error { +func (s HeartbeatStream) Send(m core.RegionHeartbeatResponse) error { select { case <-time.After(time.Second): return errors.New("timeout") @@ -52,7 +52,7 @@ func (s HeartbeatStream) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartb func (s HeartbeatStream) BindStream(storeID uint64, stream hbstream.HeartbeatStream) {} // Recv mocks method. -func (s HeartbeatStream) Recv() hbstream.RegionHeartbeatResponse { +func (s HeartbeatStream) Recv() core.RegionHeartbeatResponse { select { case <-time.After(time.Millisecond * 10): return nil diff --git a/pkg/schedule/hbstream/heartbeat_streams.go b/pkg/schedule/hbstream/heartbeat_streams.go index 18991174c36..e7d7f688035 100644 --- a/pkg/schedule/hbstream/heartbeat_streams.go +++ b/pkg/schedule/hbstream/heartbeat_streams.go @@ -32,12 +32,6 @@ import ( "go.uber.org/zap" ) -// RegionHeartbeatResponse is the interface for region heartbeat response. -type RegionHeartbeatResponse interface { - GetTargetPeer() *metapb.Peer - GetRegionId() uint64 -} - // Operation is detailed scheduling step of a region. type Operation struct { ChangePeer *pdpb.ChangePeer @@ -52,7 +46,7 @@ type Operation struct { // HeartbeatStream is an interface. type HeartbeatStream interface { - Send(RegionHeartbeatResponse) error + Send(core.RegionHeartbeatResponse) error } const ( @@ -72,7 +66,7 @@ type HeartbeatStreams struct { hbStreamCancel context.CancelFunc clusterID uint64 streams map[uint64]HeartbeatStream - msgCh chan RegionHeartbeatResponse + msgCh chan core.RegionHeartbeatResponse streamCh chan streamUpdate storeInformer core.StoreSetInformer typ string @@ -97,7 +91,7 @@ func newHbStreams(ctx context.Context, clusterID uint64, typ string, storeInform hbStreamCancel: hbStreamCancel, clusterID: clusterID, streams: make(map[uint64]HeartbeatStream), - msgCh: make(chan RegionHeartbeatResponse, heartbeatChanCapacity), + msgCh: make(chan core.RegionHeartbeatResponse, heartbeatChanCapacity), streamCh: make(chan streamUpdate, 1), storeInformer: storeInformer, typ: typ, @@ -118,7 +112,7 @@ func (s *HeartbeatStreams) run() { keepAliveTicker := time.NewTicker(heartbeatStreamKeepAliveInterval) defer keepAliveTicker.Stop() - var keepAlive RegionHeartbeatResponse + var keepAlive core.RegionHeartbeatResponse switch s.typ { case utils.SchedulingServiceName: keepAlive = &schedulingpb.RegionHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}} @@ -208,7 +202,7 @@ func (s *HeartbeatStreams) SendMsg(region *core.RegionInfo, op *Operation) { } // TODO: use generic - var resp RegionHeartbeatResponse + var resp core.RegionHeartbeatResponse switch s.typ { case utils.SchedulingServiceName: resp = &schedulingpb.RegionHeartbeatResponse{ diff --git a/server/api/stats.go b/server/api/stats.go index e8b04ba588e..1798597b6cc 100644 --- a/server/api/stats.go +++ b/server/api/stats.go @@ -48,7 +48,7 @@ func (h *statsHandler) GetRegionStatus(w http.ResponseWriter, r *http.Request) { if r.URL.Query().Has("count") { stats = rc.GetRegionCount([]byte(startKey), []byte(endKey)) } else { - stats = rc.GetRegionStats([]byte(startKey), []byte(endKey)) + stats = rc.GetRegionStatsByRange([]byte(startKey), []byte(endKey)) } h.rd.JSON(w, http.StatusOK, stats) } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 94761c330b6..dbd640d6e8c 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cluster" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" @@ -894,11 +895,21 @@ func (c *RaftCluster) GetSuspectRegions() []uint64 { return c.coordinator.GetCheckerController().GetSuspectRegions() } -// GetHotStat gets hot stat for test. +// GetHotStat gets hot stat. func (c *RaftCluster) GetHotStat() *statistics.HotStat { return c.hotStat } +// GetRegionStats gets region statistics. +func (c *RaftCluster) GetRegionStats() *statistics.RegionStatistics { + return c.regionStats +} + +// GetLabelStats gets label statistics. +func (c *RaftCluster) GetLabelStats() *statistics.LabelStatistics { + return c.labelLevelStats +} + // RemoveSuspectRegion removes region from suspect list. func (c *RaftCluster) RemoveSuspectRegion(id uint64) { c.coordinator.GetCheckerController().RemoveSuspectRegion(id) @@ -1099,15 +1110,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } if !c.isAPIServiceMode { - c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) - c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) - c.hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) - } - c.coordinator.GetSchedulersController().CheckTransferWitnessLeader(region) + cluster.HandleStatsAsync(c, region) } hasRegionStats := c.regionStats != nil @@ -1140,27 +1143,16 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil { return err } - - for _, item := range overlaps { - if !c.isAPIServiceMode { - if c.regionStats != nil { - c.regionStats.ClearDefunctRegion(item.GetID()) - } - c.labelLevelStats.ClearDefunctRegion(item.GetID()) - } - c.ruleManager.InvalidCache(item.GetID()) + if !c.isAPIServiceMode { + cluster.HandleOverlaps(c, overlaps) } regionUpdateCacheEventCounter.Inc() } if !c.isAPIServiceMode { - if hasRegionStats { - c.regionStats.Observe(region, c.getRegionStoresLocked(region)) - } - } - if !c.IsPrepared() && isNew { - c.coordinator.GetPrepareChecker().Collect(region) + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) } + if c.storage != nil { // If there are concurrent heartbeats from the same region, the last write will win even if // writes to storage in the critical area. So don't use mutex to protect it. @@ -2333,8 +2325,8 @@ func (c *RaftCluster) PutMetaCluster(meta *metapb.Cluster) error { return c.putMetaLocked(typeutil.DeepClone(meta, core.ClusterFactory)) } -// GetRegionStats returns region statistics from cluster. -func (c *RaftCluster) GetRegionStats(startKey, endKey []byte) *statistics.RegionStats { +// GetRegionStatsByRange returns region statistics from cluster. +func (c *RaftCluster) GetRegionStatsByRange(startKey, endKey []byte) *statistics.RegionStats { return statistics.GetRegionStats(c.core.ScanRegions(startKey, endKey, -1)) } diff --git a/server/grpc_service.go b/server/grpc_service.go index 1844d43f54c..5e40bc1c732 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -37,7 +37,6 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/utils" - "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/tso" @@ -1094,7 +1093,7 @@ type heartbeatServer struct { closed int32 } -func (s *heartbeatServer) Send(m hbstream.RegionHeartbeatResponse) error { +func (s *heartbeatServer) Send(m core.RegionHeartbeatResponse) error { if atomic.LoadInt32(&s.closed) == 1 { return io.EOF } @@ -1391,7 +1390,6 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error Interval: request.GetInterval(), Term: request.GetTerm(), QueryStats: request.GetQueryStats(), - CpuUsage: request.GetCpuUsage(), } if err := schedulingStream.Send(req); err != nil { log.Error("forward region heartbeat failed", zap.Error(err))