Skip to content

Commit

Permalink
heartbeat: reduce unnecessary RunTask (tikv#8559)
Browse files Browse the repository at this point in the history
ref tikv#7897

Signed-off-by: okJiang <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
okJiang and ti-chi-bot[bot] committed Aug 22, 2024
1 parent 8e45f9e commit 84138e4
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 38 deletions.
32 changes: 15 additions & 17 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,20 @@ func HandleOverlaps(ctx context.Context, c Cluster, overlaps []*core.RegionInfo)
}

// Collect collects the cluster information.
func Collect(ctx context.Context, c Cluster, region *core.RegionInfo, hasRegionStats bool) {
if hasRegionStats {
// get region again from root tree. make sure the observed region is the latest.
bc := c.GetBasicCluster()
if bc == nil {
return
}
region = bc.GetRegion(region.GetID())
if region == nil {
return
}
select {
case <-ctx.Done():
return
default:
}
c.GetRegionStats().Observe(region, c.GetBasicCluster().GetRegionStores(region))
func Collect(ctx context.Context, c Cluster, region *core.RegionInfo) {
// get region again from root tree. make sure the observed region is the latest.
bc := c.GetBasicCluster()
if bc == nil {
return
}
region = bc.GetRegion(region.GetID())
if region == nil {
return
}
select {
case <-ctx.Done():
return
default:
}
c.GetRegionStats().Observe(region, c.GetBasicCluster().GetRegionStores(region))
}
21 changes: 12 additions & 9 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
regionID,
ratelimit.ObserveRegionStatsAsync,
func(ctx context.Context) {
cluster.Collect(ctx, c, region, hasRegionStats)
cluster.Collect(ctx, c, region)
},
)
}
Expand Down Expand Up @@ -679,14 +679,17 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
)
}
tracer.OnSaveCacheFinished()
// handle region stats
ctx.TaskRunner.RunTask(
regionID,
ratelimit.CollectRegionStatsAsync,
func(ctx context.Context) {
cluster.Collect(ctx, c, region, hasRegionStats)
},
)
if hasRegionStats {
// handle region stats
ctx.TaskRunner.RunTask(
regionID,
ratelimit.CollectRegionStatsAsync,
func(ctx context.Context) {
cluster.Collect(ctx, c, region)
},
)
}

tracer.OnCollectRegionStatsFinished()
return nil
}
Expand Down
26 changes: 14 additions & 12 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
regionID,
ratelimit.ObserveRegionStatsAsync,
func(ctx context.Context) {
cluster.Collect(ctx, c, region, hasRegionStats)
cluster.Collect(ctx, c, region)
},
)
}
Expand Down Expand Up @@ -1119,17 +1119,19 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
}

tracer.OnSaveCacheFinished()
// handle region stats
ctx.MiscRunner.RunTask(
regionID,
ratelimit.CollectRegionStatsAsync,
func(ctx context.Context) {
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
// We need to think of a better way to reduce this part of the cost in the future.
cluster.Collect(ctx, c, region, hasRegionStats)
},
)
if hasRegionStats {
// handle region stats
ctx.MiscRunner.RunTask(
regionID,
ratelimit.CollectRegionStatsAsync,
func(ctx context.Context) {
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
// region stats needs to be collected in API mode.
// We need to think of a better way to reduce this part of the cost in the future.
cluster.Collect(ctx, c, region)
},
)
}

tracer.OnCollectRegionStatsFinished()
if c.storage != nil {
Expand Down

0 comments on commit 84138e4

Please sign in to comment.