diff --git a/pkg/core/metrics.go b/pkg/core/metrics.go new file mode 100644 index 00000000000..e6f3535b1d7 --- /dev/null +++ b/pkg/core/metrics.go @@ -0,0 +1,256 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +var ( + // HeartbeatBreakdownHandleDurationSum is the summary of the processing time of handle the heartbeat stage. + HeartbeatBreakdownHandleDurationSum = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "core", + Name: "region_heartbeat_breakdown_handle_duration_seconds_sum", + Help: "Bucketed histogram of processing time (s) of handle the heartbeat stage.", + }, []string{"name"}) + + // HeartbeatBreakdownHandleCount is the summary of the processing count of handle the heartbeat stage. + HeartbeatBreakdownHandleCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "core", + Name: "region_heartbeat_breakdown_handle_duration_seconds_count", + Help: "Bucketed histogram of processing count of handle the heartbeat stage.", + }, []string{"name"}) + // AcquireRegionsLockWaitDurationSum is the summary of the processing time of waiting for acquiring regions lock. + AcquireRegionsLockWaitDurationSum = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "core", + Name: "acquire_regions_lock_wait_duration_seconds_sum", + Help: "Bucketed histogram of processing time (s) of waiting for acquiring regions lock.", + }, []string{"type"}) + // AcquireRegionsLockWaitCount is the summary of the processing count of waiting for acquiring regions lock. + AcquireRegionsLockWaitCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "core", + Name: "acquire_regions_lock_wait_duration_seconds_count", + Help: "Bucketed histogram of processing count of waiting for acquiring regions lock.", + }, []string{"name"}) + + // lock statistics + waitRegionsLockDurationSum = AcquireRegionsLockWaitDurationSum.WithLabelValues("WaitRegionsLock") + waitRegionsLockCount = AcquireRegionsLockWaitCount.WithLabelValues("WaitRegionsLock") + waitSubRegionsLockDurationSum = AcquireRegionsLockWaitDurationSum.WithLabelValues("WaitSubRegionsLock") + waitSubRegionsLockCount = AcquireRegionsLockWaitCount.WithLabelValues("WaitSubRegionsLock") + + // heartbeat breakdown statistics + preCheckDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("PreCheck") + preCheckCount = HeartbeatBreakdownHandleCount.WithLabelValues("PreCheck") + asyncHotStatsDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("AsyncHotStatsDuration") + asyncHotStatsCount = HeartbeatBreakdownHandleCount.WithLabelValues("AsyncHotStatsDuration") + regionGuideDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("RegionGuide") + regionGuideCount = HeartbeatBreakdownHandleCount.WithLabelValues("RegionGuide") + checkOverlapsDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_CheckOverlaps") + checkOverlapsCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_CheckOverlaps") + validateRegionDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_InvalidRegion") + validateRegionCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_InvalidRegion") + setRegionDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_SetRegion") + setRegionCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_SetRegion") + updateSubTreeDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_UpdateSubTree") + updateSubTreeCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_UpdateSubTree") + regionCollectDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("CollectRegionStats") + regionCollectCount = HeartbeatBreakdownHandleCount.WithLabelValues("CollectRegionStats") + otherDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("Other") + otherCount = HeartbeatBreakdownHandleCount.WithLabelValues("Other") +) + +func init() { + prometheus.MustRegister(HeartbeatBreakdownHandleDurationSum) + prometheus.MustRegister(HeartbeatBreakdownHandleCount) + prometheus.MustRegister(AcquireRegionsLockWaitDurationSum) + prometheus.MustRegister(AcquireRegionsLockWaitCount) +} + +type saveCacheStats struct { + startTime time.Time + lastCheckTime time.Time + checkOverlapsDuration time.Duration + validateRegionDuration time.Duration + setRegionDuration time.Duration + updateSubTreeDuration time.Duration +} + +// RegionHeartbeatProcessTracer is used to trace the process of handling region heartbeat. +type RegionHeartbeatProcessTracer interface { + Begin() + OnPreCheckFinished() + OnAsyncHotStatsFinished() + OnRegionGuideFinished() + OnSaveCacheBegin() + OnSaveCacheFinished() + OnCheckOverlapsFinished() + OnValidateRegionFinished() + OnSetRegionFinished() + OnUpdateSubTreeFinished() + OnCollectRegionStatsFinished() + OnAllStageFinished() + LogFields() []zap.Field +} + +type noopHeartbeatProcessTracer struct{} + +// NewNoopHeartbeatProcessTracer returns a noop heartbeat process tracer. +func NewNoopHeartbeatProcessTracer() RegionHeartbeatProcessTracer { + return &noopHeartbeatProcessTracer{} +} + +func (n *noopHeartbeatProcessTracer) Begin() {} +func (n *noopHeartbeatProcessTracer) OnPreCheckFinished() {} +func (n *noopHeartbeatProcessTracer) OnAsyncHotStatsFinished() {} +func (n *noopHeartbeatProcessTracer) OnRegionGuideFinished() {} +func (n *noopHeartbeatProcessTracer) OnSaveCacheBegin() {} +func (n *noopHeartbeatProcessTracer) OnSaveCacheFinished() {} +func (n *noopHeartbeatProcessTracer) OnCheckOverlapsFinished() {} +func (n *noopHeartbeatProcessTracer) OnValidateRegionFinished() {} +func (n *noopHeartbeatProcessTracer) OnSetRegionFinished() {} +func (n *noopHeartbeatProcessTracer) OnUpdateSubTreeFinished() {} +func (n *noopHeartbeatProcessTracer) OnCollectRegionStatsFinished() {} +func (n *noopHeartbeatProcessTracer) OnAllStageFinished() {} +func (n *noopHeartbeatProcessTracer) LogFields() []zap.Field { + return nil +} + +type regionHeartbeatProcessTracer struct { + startTime time.Time + lastCheckTime time.Time + preCheckDuration time.Duration + asyncHotStatsDuration time.Duration + regionGuideDuration time.Duration + saveCacheStats saveCacheStats + OtherDuration time.Duration +} + +// NewHeartbeatProcessTracer returns a heartbeat process tracer. +func NewHeartbeatProcessTracer() RegionHeartbeatProcessTracer { + return ®ionHeartbeatProcessTracer{} +} + +func (h *regionHeartbeatProcessTracer) Begin() { + now := time.Now() + h.startTime = now + h.lastCheckTime = now +} + +func (h *regionHeartbeatProcessTracer) OnPreCheckFinished() { + now := time.Now() + h.preCheckDuration = now.Sub(h.lastCheckTime) + h.lastCheckTime = now + preCheckDurationSum.Add(h.preCheckDuration.Seconds()) + preCheckCount.Inc() +} + +func (h *regionHeartbeatProcessTracer) OnAsyncHotStatsFinished() { + now := time.Now() + h.asyncHotStatsDuration = now.Sub(h.lastCheckTime) + h.lastCheckTime = now + asyncHotStatsDurationSum.Add(h.preCheckDuration.Seconds()) + asyncHotStatsCount.Inc() +} + +func (h *regionHeartbeatProcessTracer) OnRegionGuideFinished() { + now := time.Now() + h.regionGuideDuration = now.Sub(h.lastCheckTime) + h.lastCheckTime = now + regionGuideDurationSum.Add(h.regionGuideDuration.Seconds()) + regionGuideCount.Inc() +} + +func (h *regionHeartbeatProcessTracer) OnSaveCacheBegin() { + now := time.Now() + h.saveCacheStats.startTime = now + h.saveCacheStats.lastCheckTime = now + h.lastCheckTime = now +} + +func (h *regionHeartbeatProcessTracer) OnSaveCacheFinished() { + // update the outer checkpoint time + h.lastCheckTime = time.Now() +} + +func (h *regionHeartbeatProcessTracer) OnCollectRegionStatsFinished() { + now := time.Now() + regionCollectDurationSum.Add(now.Sub(h.lastCheckTime).Seconds()) + regionCollectCount.Inc() + h.lastCheckTime = now +} + +func (h *regionHeartbeatProcessTracer) OnCheckOverlapsFinished() { + now := time.Now() + h.saveCacheStats.checkOverlapsDuration = now.Sub(h.lastCheckTime) + h.saveCacheStats.lastCheckTime = now + checkOverlapsDurationSum.Add(h.saveCacheStats.checkOverlapsDuration.Seconds()) + checkOverlapsCount.Inc() +} + +func (h *regionHeartbeatProcessTracer) OnValidateRegionFinished() { + now := time.Now() + h.saveCacheStats.validateRegionDuration = now.Sub(h.saveCacheStats.lastCheckTime) + h.saveCacheStats.lastCheckTime = now + validateRegionDurationSum.Add(h.saveCacheStats.validateRegionDuration.Seconds()) + validateRegionCount.Inc() +} + +func (h *regionHeartbeatProcessTracer) OnSetRegionFinished() { + now := time.Now() + h.saveCacheStats.setRegionDuration = now.Sub(h.saveCacheStats.lastCheckTime) + h.saveCacheStats.lastCheckTime = now + setRegionDurationSum.Add(h.saveCacheStats.setRegionDuration.Seconds()) + setRegionCount.Inc() +} + +func (h *regionHeartbeatProcessTracer) OnUpdateSubTreeFinished() { + now := time.Now() + h.saveCacheStats.updateSubTreeDuration = now.Sub(h.saveCacheStats.lastCheckTime) + h.saveCacheStats.lastCheckTime = now + updateSubTreeDurationSum.Add(h.saveCacheStats.updateSubTreeDuration.Seconds()) + updateSubTreeCount.Inc() +} + +func (h *regionHeartbeatProcessTracer) OnAllStageFinished() { + now := time.Now() + h.OtherDuration = now.Sub(h.lastCheckTime) + otherDurationSum.Add(h.OtherDuration.Seconds()) + otherCount.Inc() +} + +func (h *regionHeartbeatProcessTracer) LogFields() []zap.Field { + return []zap.Field{ + zap.Duration("pre-check-duration", h.preCheckDuration), + zap.Duration("async-hot-stats-duration", h.asyncHotStatsDuration), + zap.Duration("region-guide-duration", h.regionGuideDuration), + zap.Duration("check-overlaps-duration", h.saveCacheStats.checkOverlapsDuration), + zap.Duration("validate-region-duration", h.saveCacheStats.validateRegionDuration), + zap.Duration("set-region-duration", h.saveCacheStats.setRegionDuration), + zap.Duration("update-sub-tree-duration", h.saveCacheStats.updateSubTreeDuration), + zap.Duration("other-duration", h.OtherDuration), + } +} diff --git a/pkg/core/region.go b/pkg/core/region.go index ee03b5143ce..f7a4ef5f0fd 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -824,12 +824,49 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } } +// RWLockStats is a read-write lock with statistics. +type RWLockStats struct { + syncutil.RWMutex + totalWaitTime int64 + lockCount int64 + lastLockCount int64 + lastTotalWaitTime int64 +} + +// Lock locks the lock and records the waiting time. +func (l *RWLockStats) Lock() { + startTime := time.Now() + l.RWMutex.Lock() + elapsed := time.Since(startTime).Nanoseconds() + atomic.AddInt64(&l.totalWaitTime, elapsed) + atomic.AddInt64(&l.lockCount, 1) +} + +// Unlock unlocks the lock. +func (l *RWLockStats) Unlock() { + l.RWMutex.Unlock() +} + +// RLock locks the lock for reading and records the waiting time. +func (l *RWLockStats) RLock() { + startTime := time.Now() + l.RWMutex.RLock() + elapsed := time.Since(startTime).Nanoseconds() + atomic.AddInt64(&l.totalWaitTime, elapsed) + atomic.AddInt64(&l.lockCount, 1) +} + +// RUnlock unlocks the lock for reading. +func (l *RWLockStats) RUnlock() { + l.RWMutex.RUnlock() +} + // RegionsInfo for export type RegionsInfo struct { - t syncutil.RWMutex + t RWLockStats tree *regionTree regions map[uint64]*regionItem // regionID -> regionInfo - st syncutil.RWMutex + st RWLockStats subRegions map[uint64]*regionItem // regionID -> regionInfo leaders map[uint64]*regionTree // storeID -> sub regionTree followers map[uint64]*regionTree // storeID -> sub regionTree @@ -896,33 +933,38 @@ func (r *RegionsInfo) PutRegion(region *RegionInfo) []*RegionInfo { } // PreCheckPutRegion checks if the region is valid to put. -func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*regionItem, error) { - origin, overlaps := r.GetRelevantRegions(region) +func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo, trace RegionHeartbeatProcessTracer) (*RegionInfo, []*regionItem, error) { + origin, overlaps := r.GetRelevantRegions(region, trace) err := check(region, origin, overlaps) return origin, overlaps, err } // AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put. -func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo, error) { +func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo, trace RegionHeartbeatProcessTracer) ([]*RegionInfo, error) { r.t.Lock() var ols []*regionItem origin := r.getRegionLocked(region.GetID()) if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { ols = r.tree.overlaps(®ionItem{RegionInfo: region}) } + trace.OnCheckOverlapsFinished() err := check(region, origin, ols) if err != nil { r.t.Unlock() + trace.OnValidateRegionFinished() return nil, err } + trace.OnValidateRegionFinished() origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...) r.t.Unlock() + trace.OnSetRegionFinished() r.UpdateSubTree(region, origin, overlaps, rangeChanged) + trace.OnUpdateSubTreeFinished() return overlaps, nil } // GetRelevantRegions returns the relevant regions for a given region. -func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*regionItem) { +func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo, trace RegionHeartbeatProcessTracer) (origin *RegionInfo, overlaps []*regionItem) { r.t.RLock() defer r.t.RUnlock() origin = r.getRegionLocked(region.GetID()) @@ -1653,6 +1695,42 @@ func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 { return size } +// metrics default poll interval +const defaultPollInterval = 15 * time.Second + +// CollectWaitLockMetrics collects the metrics of waiting time for lock +func (r *RegionsInfo) CollectWaitLockMetrics() { + regionsLockTotalWaitTime := atomic.LoadInt64(&r.t.totalWaitTime) + regionsLockCount := atomic.LoadInt64(&r.t.lockCount) + + lastRegionsLockTotalWaitTime := atomic.LoadInt64(&r.t.lastTotalWaitTime) + lastsRegionsLockCount := atomic.LoadInt64(&r.t.lastLockCount) + + subRegionsLockTotalWaitTime := atomic.LoadInt64(&r.st.totalWaitTime) + subRegionsLockCount := atomic.LoadInt64(&r.st.lockCount) + + lastSubRegionsLockTotalWaitTime := atomic.LoadInt64(&r.st.lastTotalWaitTime) + lastSubRegionsLockCount := atomic.LoadInt64(&r.st.lastLockCount) + + // update last metrics + atomic.StoreInt64(&r.t.lastTotalWaitTime, regionsLockTotalWaitTime) + atomic.StoreInt64(&r.t.lastLockCount, regionsLockCount) + atomic.StoreInt64(&r.st.lastTotalWaitTime, subRegionsLockTotalWaitTime) + atomic.StoreInt64(&r.st.lastLockCount, subRegionsLockCount) + + // skip invalid situation like initial status + if lastRegionsLockTotalWaitTime == 0 || lastsRegionsLockCount == 0 || lastSubRegionsLockTotalWaitTime == 0 || lastSubRegionsLockCount == 0 || + regionsLockTotalWaitTime-lastRegionsLockTotalWaitTime < 0 || regionsLockTotalWaitTime-lastRegionsLockTotalWaitTime > int64(defaultPollInterval) || + subRegionsLockTotalWaitTime-lastSubRegionsLockTotalWaitTime < 0 || subRegionsLockTotalWaitTime-lastSubRegionsLockTotalWaitTime > int64(defaultPollInterval) { + return + } + + waitRegionsLockDurationSum.Add(time.Duration(regionsLockTotalWaitTime - lastRegionsLockTotalWaitTime).Seconds()) + waitRegionsLockCount.Add(float64(regionsLockCount - lastsRegionsLockCount)) + waitSubRegionsLockDurationSum.Add(time.Duration(subRegionsLockTotalWaitTime - lastSubRegionsLockTotalWaitTime).Seconds()) + waitSubRegionsLockCount.Add(float64(subRegionsLockCount - lastSubRegionsLockCount)) +} + // GetAdjacentRegions returns region's info that is adjacent with specific region func (r *RegionsInfo) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) { r.t.RLock() diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 1e3b6073dda..3c6536a6a77 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -459,9 +459,9 @@ func TestSetRegionConcurrence(t *testing.T) { regions := NewRegionsInfo() region := NewTestRegionInfo(1, 1, []byte("a"), []byte("b")) go func() { - regions.AtomicCheckAndPutRegion(region) + regions.AtomicCheckAndPutRegion(region, NewNoopHeartbeatProcessTracer()) }() - regions.AtomicCheckAndPutRegion(region) + regions.AtomicCheckAndPutRegion(region, NewNoopHeartbeatProcessTracer()) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/core/UpdateSubTree")) } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 9e75057621e..1b915b6874d 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -500,6 +500,8 @@ func (c *Cluster) collectMetrics() { c.labelStats.Collect() // collect hot cache metrics c.hotStat.CollectMetrics() + // collect the lock metrics + c.RegionsInfo.CollectWaitLockMetrics() } func (c *Cluster) resetMetrics() { @@ -536,28 +538,36 @@ func (c *Cluster) IsBackgroundJobsRunning() bool { // HandleRegionHeartbeat processes RegionInfo reports from client. func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { - if err := c.processRegionHeartbeat(region); err != nil { + tracer := core.NewNoopHeartbeatProcessTracer() + if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics { + tracer = core.NewHeartbeatProcessTracer() + } + tracer.Begin() + if err := c.processRegionHeartbeat(region, tracer); err != nil { + tracer.OnAllStageFinished() return err } - + tracer.OnAllStageFinished() c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) return nil } // processRegionHeartbeat updates the region information. -func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { - origin, _, err := c.PreCheckPutRegion(region) +func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo, tracer core.RegionHeartbeatProcessTracer) error { + origin, _, err := c.PreCheckPutRegion(region, tracer) + tracer.OnPreCheckFinished() if err != nil { return err } region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) cluster.HandleStatsAsync(c, region) - + tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin) + if !saveCache { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. @@ -566,21 +576,23 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { } return nil } - + tracer.OnSaveCacheBegin() var overlaps []*core.RegionInfo if saveCache { // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, // check its validation again here. // // However, it can't solve the race condition of concurrent heartbeats from the same region. - if overlaps, err = c.AtomicCheckAndPutRegion(region); err != nil { + if overlaps, err = c.AtomicCheckAndPutRegion(region, tracer); err != nil { + tracer.OnSaveCacheFinished() return err } cluster.HandleOverlaps(c, overlaps) } - + tracer.OnSaveCacheFinished() cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats) + tracer.OnCollectRegionStatsFinished() return nil } diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index 528f3a611c9..56038ddcb09 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -49,14 +49,15 @@ const ( defaultSlowStoreEvictingAffectedStoreRatioThreshold = 0.3 defaultMaxMovableHotPeerSize = int64(512) - defaultEnableJointConsensus = true - defaultEnableTiKVSplitRegion = true - defaultEnableCrossTableMerge = true - defaultEnableDiagnostic = true - defaultStrictlyMatchLabel = false - defaultEnablePlacementRules = true - defaultEnableWitness = false - defaultHaltScheduling = false + defaultEnableJointConsensus = true + defaultEnableTiKVSplitRegion = true + defaultEnableHeartbeatBreakdownMetrics = true + defaultEnableCrossTableMerge = true + defaultEnableDiagnostic = true + defaultStrictlyMatchLabel = false + defaultEnablePlacementRules = true + defaultEnableWitness = false + defaultHaltScheduling = false defaultRegionScoreFormulaVersion = "v2" defaultLeaderSchedulePolicy = "count" @@ -263,6 +264,9 @@ type ScheduleConfig struct { // on ebs-based BR we need to disable it with TTL EnableTiKVSplitRegion bool `toml:"enable-tikv-split-region" json:"enable-tikv-split-region,string"` + // EnableHeartbeatBreakdownMetrics is the option to enable heartbeat stats metrics. + EnableHeartbeatBreakdownMetrics bool `toml:"enable-heartbeat-breakdown-metrics" json:"enable-heartbeat-breakdown-metrics,string"` + // Schedulers support for loading customized schedulers Schedulers SchedulerConfigs `toml:"schedulers" json:"schedulers-v2"` // json v2 is for the sake of compatible upgrade @@ -373,6 +377,11 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool) if !meta.IsDefined("enable-tikv-split-region") { c.EnableTiKVSplitRegion = defaultEnableTiKVSplitRegion } + + if !meta.IsDefined("enable-heartbeat-breakdown-metrics") { + c.EnableHeartbeatBreakdownMetrics = defaultEnableHeartbeatBreakdownMetrics + } + if !meta.IsDefined("enable-cross-table-merge") { c.EnableCrossTableMerge = defaultEnableCrossTableMerge } diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index cd9a87aaf54..ffbd71d2f1e 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -200,7 +200,8 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { region = core.NewRegionInfo(r, regionLeader, core.SetSource(core.Sync)) } - origin, _, err := bc.PreCheckPutRegion(region) + tracer := core.NewNoopHeartbeatProcessTracer() + origin, _, err := bc.PreCheckPutRegion(region, tracer) if err != nil { log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) continue diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c69e487c3db..354e12020e3 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -990,21 +990,24 @@ func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error { var regionGuide = core.GenerateRegionGuideFunc(true) // processRegionHeartbeat updates the region information. -func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { - origin, _, err := c.core.PreCheckPutRegion(region) +func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, tracer core.RegionHeartbeatProcessTracer) error { + origin, _, err := c.core.PreCheckPutRegion(region, tracer) + tracer.OnPreCheckFinished() if err != nil { return err } + region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { cluster.HandleStatsAsync(c, region) } - + tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. saveKV, saveCache, needSync := regionGuide(region, origin) + tracer.OnRegionGuideFinished() if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. @@ -1016,11 +1019,10 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } return nil } - failpoint.Inject("concurrentRegionHeartbeat", func() { time.Sleep(500 * time.Millisecond) }) - + tracer.OnSaveCacheBegin() var overlaps []*core.RegionInfo if saveCache { failpoint.Inject("decEpoch", func() { @@ -1030,7 +1032,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { // check its validation again here. // // However, it can't solve the race condition of concurrent heartbeats from the same region. - if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil { + if overlaps, err = c.core.AtomicCheckAndPutRegion(region, tracer); err != nil { + tracer.OnSaveCacheFinished() return err } if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { @@ -1039,11 +1042,12 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { regionUpdateCacheEventCounter.Inc() } + tracer.OnSaveCacheFinished() // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats) - + tracer.OnCollectRegionStatsFinished() if c.storage != nil { // If there are concurrent heartbeats from the same region, the last write will win even if // writes to storage in the critical area. So don't use mutex to protect it. @@ -1074,7 +1078,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { default: } } - return nil } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 11c2c8c7836..dc0f7966761 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -50,6 +50,7 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" @@ -630,7 +631,7 @@ func TestRegionHeartbeatHotStat(t *testing.T) { region := core.NewRegionInfo(regionMeta, leader, core.WithInterval(&pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: utils.RegionHeartBeatReportInterval}), core.SetWrittenBytes(30000*10), core.SetWrittenKeys(300000*10)) - err = cluster.processRegionHeartbeat(region) + err = cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) re.NoError(err) // wait HotStat to update items time.Sleep(time.Second) @@ -643,7 +644,7 @@ func TestRegionHeartbeatHotStat(t *testing.T) { StoreId: 4, } region = region.Clone(core.WithRemoveStorePeer(2), core.WithAddPeer(newPeer)) - err = cluster.processRegionHeartbeat(region) + err = cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) re.NoError(err) // wait HotStat to update items time.Sleep(time.Second) @@ -680,8 +681,8 @@ func TestBucketHeartbeat(t *testing.T) { re.NoError(cluster.putStoreLocked(store)) } - re.NoError(cluster.processRegionHeartbeat(regions[0])) - re.NoError(cluster.processRegionHeartbeat(regions[1])) + re.NoError(cluster.processRegionHeartbeat(regions[0], core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(regions[1], core.NewNoopHeartbeatProcessTracer())) re.Nil(cluster.GetRegion(uint64(1)).GetBuckets()) re.NoError(cluster.processReportBuckets(buckets)) re.Equal(buckets, cluster.GetRegion(uint64(1)).GetBuckets()) @@ -700,13 +701,13 @@ func TestBucketHeartbeat(t *testing.T) { // case5: region update should inherit buckets. newRegion := regions[1].Clone(core.WithIncConfVer(), core.SetBuckets(nil)) opt.SetRegionBucketEnabled(true) - re.NoError(cluster.processRegionHeartbeat(newRegion)) + re.NoError(cluster.processRegionHeartbeat(newRegion, core.NewNoopHeartbeatProcessTracer())) re.Len(cluster.GetRegion(uint64(1)).GetBuckets().GetKeys(), 2) // case6: disable region bucket in opt.SetRegionBucketEnabled(false) newRegion2 := regions[1].Clone(core.WithIncConfVer(), core.SetBuckets(nil)) - re.NoError(cluster.processRegionHeartbeat(newRegion2)) + re.NoError(cluster.processRegionHeartbeat(newRegion2, core.NewNoopHeartbeatProcessTracer())) re.Nil(cluster.GetRegion(uint64(1)).GetBuckets()) re.Empty(cluster.GetRegion(uint64(1)).GetBuckets().GetKeys()) } @@ -732,25 +733,25 @@ func TestRegionHeartbeat(t *testing.T) { for i, region := range regions { // region does not exist. - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is the same, not updated. - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) origin := region // region is updated. region = origin.Clone(core.WithIncVersion()) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is stale (Version). stale := origin.Clone(core.WithIncConfVer()) - re.Error(cluster.processRegionHeartbeat(stale)) + re.Error(cluster.processRegionHeartbeat(stale, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) @@ -760,13 +761,13 @@ func TestRegionHeartbeat(t *testing.T) { core.WithIncConfVer(), ) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is stale (ConfVer). stale = origin.Clone(core.WithIncConfVer()) - re.Error(cluster.processRegionHeartbeat(stale)) + re.Error(cluster.processRegionHeartbeat(stale, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) @@ -778,38 +779,38 @@ func TestRegionHeartbeat(t *testing.T) { }, })) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) // Add a pending peer. region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetPeers()[rand.Intn(len(region.GetPeers()))]})) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) // Clear down peers. region = region.Clone(core.WithDownPeers(nil)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) // Clear pending peers. region = region.Clone(core.WithPendingPeers(nil)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) // Remove peers. origin = region region = origin.Clone(core.SetPeers(region.GetPeers()[:1])) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // Add peers. region = origin regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) @@ -819,47 +820,47 @@ func TestRegionHeartbeat(t *testing.T) { core.WithIncConfVer(), ) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) // Change leader. region = region.Clone(core.WithLeader(region.GetPeers()[1])) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) // Change ApproximateSize. region = region.Clone(core.SetApproximateSize(144)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) // Change ApproximateKeys. region = region.Clone(core.SetApproximateKeys(144000)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) // Change bytes written. region = region.Clone(core.SetWrittenBytes(24000)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) // Change bytes read. region = region.Clone(core.SetReadBytes(1080000)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) // Flashback region = region.Clone(core.WithFlashback(true, 1)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) region = region.Clone(core.WithFlashback(false, 0)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region)) + re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) checkRegions(re, cluster.core, regions[:i+1]) } @@ -915,7 +916,8 @@ func TestRegionHeartbeat(t *testing.T) { core.WithNewRegionID(10000), core.WithDecVersion(), ) - re.Error(cluster.processRegionHeartbeat(overlapRegion)) + tracer := core.NewHeartbeatProcessTracer() + re.Error(cluster.processRegionHeartbeat(overlapRegion, tracer)) region := &metapb.Region{} ok, err := storage.LoadRegion(regions[n-1].GetID(), region) re.True(ok) @@ -939,7 +941,14 @@ func TestRegionHeartbeat(t *testing.T) { core.WithStartKey(regions[n-2].GetStartKey()), core.WithNewRegionID(regions[n-1].GetID()+1), ) - re.NoError(cluster.processRegionHeartbeat(overlapRegion)) + tracer = core.NewHeartbeatProcessTracer() + tracer.Begin() + re.NoError(cluster.processRegionHeartbeat(overlapRegion, tracer)) + tracer.OnAllStageFinished() + re.Condition(func() bool { + fileds := tracer.LogFields() + return slice.AllOf(fileds, func(i int) bool { return fileds[i].Integer > 0 }) + }, "should have stats") region = &metapb.Region{} ok, err = storage.LoadRegion(regions[n-1].GetID(), region) re.False(ok) @@ -968,7 +977,7 @@ func TestRegionFlowChanged(t *testing.T) { regions := []*core.RegionInfo{core.NewTestRegionInfo(1, 1, []byte{}, []byte{})} processRegions := func(regions []*core.RegionInfo) { for _, r := range regions { - cluster.processRegionHeartbeat(r) + cluster.processRegionHeartbeat(r, core.NewNoopHeartbeatProcessTracer()) } } regions = core.SplitRegions(regions) @@ -1004,7 +1013,7 @@ func TestRegionSizeChanged(t *testing.T) { core.SetApproximateKeys(curMaxMergeKeys-1), core.SetSource(core.Heartbeat), ) - cluster.processRegionHeartbeat(region) + cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) regionID := region.GetID() re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) // Test ApproximateSize and ApproximateKeys change. @@ -1014,16 +1023,16 @@ func TestRegionSizeChanged(t *testing.T) { core.SetApproximateKeys(curMaxMergeKeys+1), core.SetSource(core.Heartbeat), ) - cluster.processRegionHeartbeat(region) + cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) // Test MaxMergeRegionSize and MaxMergeRegionKeys change. cluster.opt.SetMaxMergeRegionSize(uint64(curMaxMergeSize + 2)) cluster.opt.SetMaxMergeRegionKeys(uint64(curMaxMergeKeys + 2)) - cluster.processRegionHeartbeat(region) + cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) cluster.opt.SetMaxMergeRegionSize(uint64(curMaxMergeSize)) cluster.opt.SetMaxMergeRegionKeys(uint64(curMaxMergeKeys)) - cluster.processRegionHeartbeat(region) + cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) } @@ -1086,11 +1095,11 @@ func TestConcurrentRegionHeartbeat(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/concurrentRegionHeartbeat", "return(true)")) go func() { defer wg.Done() - cluster.processRegionHeartbeat(source) + cluster.processRegionHeartbeat(source, core.NewNoopHeartbeatProcessTracer()) }() time.Sleep(100 * time.Millisecond) re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/concurrentRegionHeartbeat")) - re.NoError(cluster.processRegionHeartbeat(target)) + re.NoError(cluster.processRegionHeartbeat(target, core.NewNoopHeartbeatProcessTracer())) wg.Wait() checkRegion(re, cluster.GetRegionByKey([]byte{}), target) } @@ -1152,7 +1161,7 @@ func TestRegionLabelIsolationLevel(t *testing.T) { func heartbeatRegions(re *require.Assertions, cluster *RaftCluster, regions []*core.RegionInfo) { // Heartbeat and check region one by one. for _, r := range regions { - re.NoError(cluster.processRegionHeartbeat(r)) + re.NoError(cluster.processRegionHeartbeat(r, core.NewNoopHeartbeatProcessTracer())) checkRegion(re, cluster.GetRegion(r.GetID()), r) checkRegion(re, cluster.GetRegionByKey(r.GetStartKey()), r) @@ -1189,7 +1198,7 @@ func TestHeartbeatSplit(t *testing.T) { // 1: [nil, nil) region1 := core.NewRegionInfo(&metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) - re.NoError(cluster.processRegionHeartbeat(region1)) + re.NoError(cluster.processRegionHeartbeat(region1, core.NewNoopHeartbeatProcessTracer())) checkRegion(re, cluster.GetRegionByKey([]byte("foo")), region1) // split 1 to 2: [nil, m) 1: [m, nil), sync 2 first. @@ -1198,12 +1207,12 @@ func TestHeartbeatSplit(t *testing.T) { core.WithIncVersion(), ) region2 := core.NewRegionInfo(&metapb.Region{Id: 2, EndKey: []byte("m"), RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) - re.NoError(cluster.processRegionHeartbeat(region2)) + re.NoError(cluster.processRegionHeartbeat(region2, core.NewNoopHeartbeatProcessTracer())) checkRegion(re, cluster.GetRegionByKey([]byte("a")), region2) // [m, nil) is missing before r1's heartbeat. re.Nil(cluster.GetRegionByKey([]byte("z"))) - re.NoError(cluster.processRegionHeartbeat(region1)) + re.NoError(cluster.processRegionHeartbeat(region1, core.NewNoopHeartbeatProcessTracer())) checkRegion(re, cluster.GetRegionByKey([]byte("z")), region1) // split 1 to 3: [m, q) 1: [q, nil), sync 1 first. @@ -1212,12 +1221,12 @@ func TestHeartbeatSplit(t *testing.T) { core.WithIncVersion(), ) region3 := core.NewRegionInfo(&metapb.Region{Id: 3, StartKey: []byte("m"), EndKey: []byte("q"), RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) - re.NoError(cluster.processRegionHeartbeat(region1)) + re.NoError(cluster.processRegionHeartbeat(region1, core.NewNoopHeartbeatProcessTracer())) checkRegion(re, cluster.GetRegionByKey([]byte("z")), region1) checkRegion(re, cluster.GetRegionByKey([]byte("a")), region2) // [m, q) is missing before r3's heartbeat. re.Nil(cluster.GetRegionByKey([]byte("n"))) - re.NoError(cluster.processRegionHeartbeat(region3)) + re.NoError(cluster.processRegionHeartbeat(region3, core.NewNoopHeartbeatProcessTracer())) checkRegion(re, cluster.GetRegionByKey([]byte("n")), region3) } @@ -1513,11 +1522,11 @@ func TestUpdateStorePendingPeerCount(t *testing.T) { }, } origin := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: peers[:3]}, peers[0], core.WithPendingPeers(peers[1:3])) - re.NoError(tc.processRegionHeartbeat(origin)) + re.NoError(tc.processRegionHeartbeat(origin, core.NewNoopHeartbeatProcessTracer())) time.Sleep(50 * time.Millisecond) checkPendingPeerCount([]int{0, 1, 1, 0}, tc.RaftCluster, re) newRegion := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: peers[1:]}, peers[1], core.WithPendingPeers(peers[3:4])) - re.NoError(tc.processRegionHeartbeat(newRegion)) + re.NoError(tc.processRegionHeartbeat(newRegion, core.NewNoopHeartbeatProcessTracer())) time.Sleep(50 * time.Millisecond) checkPendingPeerCount([]int{0, 0, 0, 1}, tc.RaftCluster, re) } @@ -2950,12 +2959,12 @@ func TestShouldRun(t *testing.T) { for _, testCase := range testCases { r := tc.GetRegion(testCase.regionID) nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) - re.NoError(tc.processRegionHeartbeat(nr)) + re.NoError(tc.processRegionHeartbeat(nr, core.NewNoopHeartbeatProcessTracer())) re.Equal(testCase.ShouldRun, co.ShouldRun()) } nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) - re.Error(tc.processRegionHeartbeat(newRegion)) + re.Error(tc.processRegionHeartbeat(newRegion, core.NewNoopHeartbeatProcessTracer())) re.Equal(7, tc.core.GetClusterNotFromStorageRegionsCnt()) } @@ -2993,12 +3002,12 @@ func TestShouldRunWithNonLeaderRegions(t *testing.T) { for _, testCase := range testCases { r := tc.GetRegion(testCase.regionID) nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) - re.NoError(tc.processRegionHeartbeat(nr)) + re.NoError(tc.processRegionHeartbeat(nr, core.NewNoopHeartbeatProcessTracer())) re.Equal(testCase.ShouldRun, co.ShouldRun()) } nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}} newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) - re.Error(tc.processRegionHeartbeat(newRegion)) + re.Error(tc.processRegionHeartbeat(newRegion, core.NewNoopHeartbeatProcessTracer())) re.Equal(9, tc.core.GetClusterNotFromStorageRegionsCnt()) // Now, after server is prepared, there exist some regions with no leader. diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 74a445ad78e..5ae8fdc0396 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -34,9 +34,16 @@ import ( // HandleRegionHeartbeat processes RegionInfo reports from client. func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { - if err := c.processRegionHeartbeat(region); err != nil { + tracer := core.NewNoopHeartbeatProcessTracer() + if c.GetScheduleConfig().EnableHeartbeatBreakdownMetrics { + tracer = core.NewHeartbeatProcessTracer() + } + tracer.Begin() + if err := c.processRegionHeartbeat(region, tracer); err != nil { + tracer.OnAllStageFinished() return err } + tracer.OnAllStageFinished() if c.IsServiceIndependent(mcsutils.SchedulingServiceName) { return nil diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index a36e7159cfd..322ccc94d0e 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -194,6 +194,8 @@ func (sc *schedulingController) collectSchedulingMetrics() { sc.labelStats.Collect() // collect hot cache metrics sc.hotStat.CollectMetrics() + // collect the lock metrics + sc.RegionsInfo.CollectWaitLockMetrics() } func (sc *schedulingController) removeStoreStatistics(storeID uint64) { diff --git a/server/grpc_service.go b/server/grpc_service.go index 095e45775dc..b6cdce4c8b8 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1293,7 +1293,6 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error continue } start := time.Now() - err = rc.HandleRegionHeartbeat(region) if err != nil { regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "err").Inc() @@ -1301,7 +1300,6 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error s.hbStreams.SendErr(pdpb.ErrorType_UNKNOWN, msg, request.GetLeader()) continue } - regionHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds()) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc()