From a17a5cfbce89385e0abc4c67a27581170eee036c Mon Sep 17 00:00:00 2001 From: nolouch Date: Thu, 7 Mar 2024 15:33:17 +0800 Subject: [PATCH] fix Signed-off-by: nolouch --- pkg/core/metrics.go | 33 ++++++++++++++++++---------- pkg/mcs/scheduling/server/cluster.go | 1 + server/cluster/cluster.go | 24 ++++++++------------ 3 files changed, 32 insertions(+), 26 deletions(-) diff --git a/pkg/core/metrics.go b/pkg/core/metrics.go index bd23605e563b..e6f3535b1d74 100644 --- a/pkg/core/metrics.go +++ b/pkg/core/metrics.go @@ -77,6 +77,8 @@ var ( setRegionCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_SetRegion") updateSubTreeDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("SaveCache_UpdateSubTree") updateSubTreeCount = HeartbeatBreakdownHandleCount.WithLabelValues("SaveCache_UpdateSubTree") + regionCollectDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("CollectRegionStats") + regionCollectCount = HeartbeatBreakdownHandleCount.WithLabelValues("CollectRegionStats") otherDurationSum = HeartbeatBreakdownHandleDurationSum.WithLabelValues("Other") otherCount = HeartbeatBreakdownHandleCount.WithLabelValues("Other") ) @@ -109,6 +111,7 @@ type RegionHeartbeatProcessTracer interface { OnValidateRegionFinished() OnSetRegionFinished() OnUpdateSubTreeFinished() + OnCollectRegionStatsFinished() OnAllStageFinished() LogFields() []zap.Field } @@ -120,17 +123,18 @@ func NewNoopHeartbeatProcessTracer() RegionHeartbeatProcessTracer { return &noopHeartbeatProcessTracer{} } -func (n *noopHeartbeatProcessTracer) Begin() {} -func (n *noopHeartbeatProcessTracer) OnPreCheckFinished() {} -func (n *noopHeartbeatProcessTracer) OnAsyncHotStatsFinished() {} -func (n *noopHeartbeatProcessTracer) OnRegionGuideFinished() {} -func (n *noopHeartbeatProcessTracer) OnSaveCacheBegin() {} -func (n *noopHeartbeatProcessTracer) OnSaveCacheFinished() {} -func (n *noopHeartbeatProcessTracer) OnCheckOverlapsFinished() {} -func (n *noopHeartbeatProcessTracer) OnValidateRegionFinished() {} -func (n *noopHeartbeatProcessTracer) OnSetRegionFinished() {} -func (n *noopHeartbeatProcessTracer) OnUpdateSubTreeFinished() {} -func (n *noopHeartbeatProcessTracer) OnAllStageFinished() {} +func (n *noopHeartbeatProcessTracer) Begin() {} +func (n *noopHeartbeatProcessTracer) OnPreCheckFinished() {} +func (n *noopHeartbeatProcessTracer) OnAsyncHotStatsFinished() {} +func (n *noopHeartbeatProcessTracer) OnRegionGuideFinished() {} +func (n *noopHeartbeatProcessTracer) OnSaveCacheBegin() {} +func (n *noopHeartbeatProcessTracer) OnSaveCacheFinished() {} +func (n *noopHeartbeatProcessTracer) OnCheckOverlapsFinished() {} +func (n *noopHeartbeatProcessTracer) OnValidateRegionFinished() {} +func (n *noopHeartbeatProcessTracer) OnSetRegionFinished() {} +func (n *noopHeartbeatProcessTracer) OnUpdateSubTreeFinished() {} +func (n *noopHeartbeatProcessTracer) OnCollectRegionStatsFinished() {} +func (n *noopHeartbeatProcessTracer) OnAllStageFinished() {} func (n *noopHeartbeatProcessTracer) LogFields() []zap.Field { return nil } @@ -192,6 +196,13 @@ func (h *regionHeartbeatProcessTracer) OnSaveCacheFinished() { h.lastCheckTime = time.Now() } +func (h *regionHeartbeatProcessTracer) OnCollectRegionStatsFinished() { + now := time.Now() + regionCollectDurationSum.Add(now.Sub(h.lastCheckTime).Seconds()) + regionCollectCount.Inc() + h.lastCheckTime = now +} + func (h *regionHeartbeatProcessTracer) OnCheckOverlapsFinished() { now := time.Now() h.saveCacheStats.checkOverlapsDuration = now.Sub(h.lastCheckTime) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 273b57547ece..1b915b6874d2 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -592,6 +592,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo, tracer core.Re } tracer.OnSaveCacheFinished() cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats) + tracer.OnCollectRegionStatsFinished() return nil } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index f87fab36f8c6..4597124a5ae9 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -990,9 +990,9 @@ func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error { var regionGuide = core.GenerateRegionGuideFunc(true) // processRegionHeartbeat updates the region information. -func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, trace core.RegionHeartbeatProcessTracer) error { - origin, _, err := c.core.PreCheckPutRegion(region, trace) - trace.OnPreCheckFinished() +func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, tracer core.RegionHeartbeatProcessTracer) error { + origin, _, err := c.core.PreCheckPutRegion(region, tracer) + tracer.OnPreCheckFinished() if err != nil { return err } @@ -1002,12 +1002,12 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, trace core if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { cluster.HandleStatsAsync(c, region) } - trace.OnAsyncHotStatsFinished() + tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. saveKV, saveCache, needSync := regionGuide(region, origin) - trace.OnRegionGuideFinished() + tracer.OnRegionGuideFinished() if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. @@ -1022,7 +1022,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, trace core failpoint.Inject("concurrentRegionHeartbeat", func() { time.Sleep(500 * time.Millisecond) }) - trace.OnSaveCacheBegin() + tracer.OnSaveCacheBegin() var overlaps []*core.RegionInfo if saveCache { failpoint.Inject("decEpoch", func() { @@ -1033,7 +1033,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, trace core // // However, it can't solve the race condition of concurrent heartbeats from the same region. if overlaps, err = c.core.AtomicCheckAndPutRegion(region, trace); err != nil { - trace.OnSaveCacheFinished() + tracer.OnSaveCacheFinished() return err } if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { @@ -1041,19 +1041,13 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, trace core } regionUpdateCacheEventCounter.Inc() } -<<<<<<< HEAD - trace.OnSaveCacheFinished() - if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats) - } -======= + tracer.OnSaveCacheFinished() // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats) ->>>>>>> origin/master - + tracer.OnCollectRegionStatsFinished() if c.storage != nil { // If there are concurrent heartbeats from the same region, the last write will win even if // writes to storage in the critical area. So don't use mutex to protect it.