Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: optimize memory usage #8164

Merged
merged 8 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,25 @@ type Cluster interface {

// HandleStatsAsync handles the flow asynchronously.
func HandleStatsAsync(c Cluster, region *core.RegionInfo) {
c.GetHotStat().CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
c.GetHotStat().CheckWriteAsync(statistics.NewCheckWritePeerTask(region))
checkWritePeerTask := func(cache *statistics.HotPeerCache) {
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
stats := cache.CheckPeerFlow(region, region.GetPeers(), region.GetWriteLoads(), interval)
for _, stat := range stats {
cache.UpdateStat(stat)
}
}

checkExpiredTask := func(cache *statistics.HotPeerCache) {
expiredStats := cache.CollectExpiredItems(region)
for _, stat := range expiredStats {
cache.UpdateStat(stat)
}
}

c.GetHotStat().CheckWriteAsync(checkExpiredTask)
c.GetHotStat().CheckReadAsync(checkExpiredTask)
c.GetHotStat().CheckWriteAsync(checkWritePeerTask)
c.GetCoordinator().GetSchedulersController().CheckTransferWitnessLeader(region)
}

Expand Down
91 changes: 37 additions & 54 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error {
// the properties are Read-Only once created except buckets.
// the `buckets` could be modified by the request `report buckets` with greater version.
type RegionInfo struct {
term uint64
meta *metapb.Region
learners []*metapb.Peer
witnesses []*metapb.Peer
voters []*metapb.Peer
leader *metapb.Peer
downPeers []*pdpb.PeerStats
pendingPeers []*metapb.Peer
term uint64
cpuUsage uint64
writtenBytes uint64
writtenKeys uint64
Expand Down Expand Up @@ -136,26 +136,22 @@ func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCre

// classifyVoterAndLearner sorts out voter and learner from peers into different slice.
func classifyVoterAndLearner(region *RegionInfo) {
learners := make([]*metapb.Peer, 0, 1)
voters := make([]*metapb.Peer, 0, len(region.meta.Peers))
witnesses := make([]*metapb.Peer, 0, 1)
region.learners = make([]*metapb.Peer, 0, 1)
region.voters = make([]*metapb.Peer, 0, len(region.meta.Peers))
region.witnesses = make([]*metapb.Peer, 0, 1)
for _, p := range region.meta.Peers {
if IsLearner(p) {
learners = append(learners, p)
region.learners = append(region.learners, p)
} else {
voters = append(voters, p)
region.voters = append(region.voters, p)
}
// Whichever peer role can be a witness
if IsWitness(p) {
witnesses = append(witnesses, p)
region.witnesses = append(region.witnesses, p)
}
}
sort.Sort(peerSlice(learners))
sort.Sort(peerSlice(voters))
sort.Sort(peerSlice(witnesses))
region.learners = learners
region.voters = voters
region.witnesses = witnesses
sort.Sort(peerSlice(region.learners))
sort.Sort(peerSlice(region.voters))
sort.Sort(peerSlice(region.witnesses))
}

// peersEqualTo returns true when the peers are not changed, which may caused by: the region leader not changed,
Expand Down Expand Up @@ -213,7 +209,7 @@ type RegionHeartbeatRequest interface {
}

// RegionFromHeartbeat constructs a Region from region heartbeat.
func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo {
func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, flowRoundDivisor int) *RegionInfo {
// Convert unit to MB.
// If region isn't empty and less than 1MB, use 1MB instead.
// The size of empty region will be correct by the previous RegionInfo.
Expand All @@ -223,20 +219,21 @@ func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateO
}

region := &RegionInfo{
term: heartbeat.GetTerm(),
meta: heartbeat.GetRegion(),
leader: heartbeat.GetLeader(),
downPeers: heartbeat.GetDownPeers(),
pendingPeers: heartbeat.GetPendingPeers(),
writtenBytes: heartbeat.GetBytesWritten(),
writtenKeys: heartbeat.GetKeysWritten(),
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
queryStats: heartbeat.GetQueryStats(),
source: Heartbeat,
term: heartbeat.GetTerm(),
meta: heartbeat.GetRegion(),
leader: heartbeat.GetLeader(),
downPeers: heartbeat.GetDownPeers(),
pendingPeers: heartbeat.GetPendingPeers(),
writtenBytes: heartbeat.GetBytesWritten(),
writtenKeys: heartbeat.GetKeysWritten(),
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
queryStats: heartbeat.GetQueryStats(),
source: Heartbeat,
flowRoundDivisor: uint64(flowRoundDivisor),
}

// scheduling service doesn't need the following fields.
Expand All @@ -246,10 +243,6 @@ func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, opts ...RegionCreateO
region.cpuUsage = h.GetCpuUsage()
}

for _, opt := range opts {
opt(region)
}

if region.writtenKeys >= ImpossibleFlowSize || region.writtenBytes >= ImpossibleFlowSize {
region.writtenKeys = 0
region.writtenBytes = 0
Expand Down Expand Up @@ -957,11 +950,11 @@ func (r *RegionsInfo) getRegionLocked(regionID uint64) *RegionInfo {
func (r *RegionsInfo) CheckAndPutRegion(region *RegionInfo) []*RegionInfo {
r.t.Lock()
origin := r.getRegionLocked(region.GetID())
var ols []*regionItem
var ols []*RegionInfo
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
err := check(region, origin, convertItemsToRegions(ols))
err := check(region, origin, ols)
if err != nil {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
// return the state region to delete.
Expand All @@ -988,25 +981,17 @@ func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*Reg
return origin, overlaps, err
}

func convertItemsToRegions(items []*regionItem) []*RegionInfo {
regions := make([]*RegionInfo, 0, len(items))
for _, item := range items {
regions = append(regions, item.RegionInfo)
}
return regions
}

// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put.
func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) {
tracer := ctx.Tracer
r.t.Lock()
var ols []*regionItem
var ols []*RegionInfo
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
tracer.OnCheckOverlapsFinished()
err := check(region, origin, convertItemsToRegions(ols))
err := check(region, origin, ols)
if err != nil {
r.t.Unlock()
tracer.OnValidateRegionFinished()
Expand All @@ -1026,13 +1011,13 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *R
func (r *RegionsInfo) CheckAndPutRootTree(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) {
tracer := ctx.Tracer
r.t.Lock()
var ols []*regionItem
var ols []*RegionInfo
origin := r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
ols = r.tree.overlaps(&regionItem{RegionInfo: region})
}
tracer.OnCheckOverlapsFinished()
err := check(region, origin, convertItemsToRegions(ols))
err := check(region, origin, ols)
if err != nil {
r.t.Unlock()
tracer.OnValidateRegionFinished()
Expand Down Expand Up @@ -1123,7 +1108,7 @@ func (r *RegionsInfo) updateSubTreeLocked(rangeChanged bool, overlaps []*RegionI
if len(overlaps) == 0 {
// If the range has changed but the overlapped regions are not provided, collect them by `[]*regionItem`.
for _, item := range r.getOverlapRegionFromOverlapTreeLocked(region) {
r.removeRegionFromSubTreeLocked(item.RegionInfo)
r.removeRegionFromSubTreeLocked(item)
}
} else {
// Remove all provided overlapped regions from the subtrees.
Expand Down Expand Up @@ -1164,7 +1149,7 @@ func (r *RegionsInfo) updateSubTreeLocked(rangeChanged bool, overlaps []*RegionI
setPeers(r.pendingPeers, region.GetPendingPeers())
}

func (r *RegionsInfo) getOverlapRegionFromOverlapTreeLocked(region *RegionInfo) []*regionItem {
func (r *RegionsInfo) getOverlapRegionFromOverlapTreeLocked(region *RegionInfo) []*RegionInfo {
return r.overlapTree.overlaps(&regionItem{RegionInfo: region})
}

Expand All @@ -1174,9 +1159,7 @@ func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo
defer r.t.RUnlock()
origin = r.getRegionLocked(region.GetID())
if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) {
for _, item := range r.tree.overlaps(&regionItem{RegionInfo: region}) {
overlaps = append(overlaps, item.RegionInfo)
}
return origin, r.tree.overlaps(&regionItem{RegionInfo: region})
}
return
}
Expand Down Expand Up @@ -1211,7 +1194,7 @@ func (r *RegionsInfo) SetRegion(region *RegionInfo) (*RegionInfo, []*RegionInfo,
return r.setRegionLocked(region, false)
}

func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol ...*regionItem) (*RegionInfo, []*RegionInfo, bool) {
func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol ...*RegionInfo) (*RegionInfo, []*RegionInfo, bool) {
var (
item *regionItem // Pointer to the *RegionInfo of this ID.
origin *RegionInfo
Expand Down Expand Up @@ -1311,7 +1294,7 @@ func (r *RegionsInfo) TreeLen() int {
}

// GetOverlaps returns the regions which are overlapped with the specified region range.
func (r *RegionsInfo) GetOverlaps(region *RegionInfo) []*regionItem {
func (r *RegionsInfo) GetOverlaps(region *RegionInfo) []*RegionInfo {
r.t.RLock()
defer r.t.RUnlock()
return r.tree.overlaps(&regionItem{RegionInfo: region})
Expand Down
8 changes: 5 additions & 3 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,19 @@ func TestSortedEqual(t *testing.T) {
re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetVoters(), regionB.GetVoters()))
}

flowRoundDivisor := 3
// test RegionFromHeartbeat
for _, testCase := range testCases {
regionA := RegionFromHeartbeat(&pdpb.RegionHeartbeatRequest{
Region: &metapb.Region{Id: 100, Peers: pickPeers(testCase.idsA)},
DownPeers: pickPeerStats(testCase.idsA),
PendingPeers: pickPeers(testCase.idsA),
})
}, flowRoundDivisor)
regionB := RegionFromHeartbeat(&pdpb.RegionHeartbeatRequest{
Region: &metapb.Region{Id: 100, Peers: pickPeers(testCase.idsB)},
DownPeers: pickPeerStats(testCase.idsB),
PendingPeers: pickPeers(testCase.idsB),
})
}, flowRoundDivisor)
re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetVoters(), regionB.GetVoters()))
re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetVoters(), regionB.GetVoters()))
re.Equal(testCase.isEqual, SortedPeersEqual(regionA.GetPendingPeers(), regionB.GetPendingPeers()))
Expand Down Expand Up @@ -950,9 +951,10 @@ func BenchmarkRegionFromHeartbeat(b *testing.B) {
PendingPeers: []*metapb.Peer{peers[1]},
DownPeers: []*pdpb.PeerStats{{Peer: peers[2], DownSeconds: 100}},
}
flowRoundDivisor := 3
b.ResetTimer()
for i := 0; i < b.N; i++ {
RegionFromHeartbeat(regionReq)
RegionFromHeartbeat(regionReq, flowRoundDivisor)
}
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (t *regionTree) notFromStorageRegionsCount() int {
}

// GetOverlaps returns the range items that has some intersections with the given items.
func (t *regionTree) overlaps(item *regionItem) []*regionItem {
func (t *regionTree) overlaps(item *regionItem) []*RegionInfo {
// note that Find() gets the last item that is less or equal than the item.
// in the case: |_______a_______|_____b_____|___c___|
// new item is |______d______|
Expand All @@ -116,12 +116,12 @@ func (t *regionTree) overlaps(item *regionItem) []*regionItem {
result = item
}
endKey := item.GetEndKey()
var overlaps []*regionItem
var overlaps []*RegionInfo
t.tree.AscendGreaterOrEqual(result, func(i *regionItem) bool {
if len(endKey) > 0 && bytes.Compare(endKey, i.GetStartKey()) <= 0 {
return false
}
overlaps = append(overlaps, i)
overlaps = append(overlaps, i.RegionInfo)
return true
})
return overlaps
Expand All @@ -130,7 +130,7 @@ func (t *regionTree) overlaps(item *regionItem) []*regionItem {
// update updates the tree with the region.
// It finds and deletes all the overlapped regions first, and then
// insert the region.
func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*regionItem) []*RegionInfo {
func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*RegionInfo) []*RegionInfo {
region := item.RegionInfo
t.totalSize += region.approximateSize
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
Expand All @@ -145,15 +145,15 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
}

for _, old := range overlaps {
t.tree.Delete(old)
t.tree.Delete(&regionItem{RegionInfo: old})
}
t.tree.ReplaceOrInsert(item)
if t.countRef {
item.RegionInfo.IncRef()
}
result := make([]*RegionInfo, len(overlaps))
for i, overlap := range overlaps {
old := overlap.RegionInfo
old := overlap
result[i] = old
log.Debug("overlapping region",
zap.Uint64("region-id", old.GetID()),
Expand All @@ -174,7 +174,7 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
return result
}

// updateStat is used to update statistics when regionItem.RegionInfo is directly replaced.
// updateStat is used to update statistics when RegionInfo is directly replaced.
func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
t.totalSize += region.approximateSize
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/region_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ func TestRegionTree(t *testing.T) {
updateNewItem(tree, regionA)
updateNewItem(tree, regionC)
re.Nil(tree.overlaps(newRegionItem([]byte("b"), []byte("c"))))
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("c"), []byte("d")))[0].RegionInfo)
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("a"), []byte("cc")))[1].RegionInfo)
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("c"), []byte("d")))[0])
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("a"), []byte("cc")))[1])
re.Nil(tree.search([]byte{}))
re.Equal(regionA, tree.search([]byte("a")))
re.Nil(tree.search([]byte("b")))
Expand Down
16 changes: 14 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,23 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq
utils.RegionWriteKeys: 0,
utils.RegionWriteQueryNum: 0,
}
c.hotStat.CheckReadAsync(statistics.NewCheckReadPeerTask(region, []*metapb.Peer{peer}, loads, interval))
checkReadPeerTask := func(cache *statistics.HotPeerCache) {
stats := cache.CheckPeerFlow(region, []*metapb.Peer{peer}, loads, interval)
for _, stat := range stats {
cache.UpdateStat(stat)
}
}
c.hotStat.CheckReadAsync(checkReadPeerTask)
}

// Here we will compare the reported regions with the previous hot peers to decide if it is still hot.
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval))
collectUnReportedPeerTask := func(cache *statistics.HotPeerCache) {
stats := cache.CheckColdPeer(storeID, regions, interval)
for _, stat := range stats {
cache.UpdateStat(stat)
}
}
c.hotStat.CheckReadAsync(collectUnReportedPeerTask)
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat
s.hbStreams.BindStream(storeID, server)
lastBind = time.Now()
}
region := core.RegionFromHeartbeat(request)
// scheduling service doesn't sync the pd server config, so we use 0 here
region := core.RegionFromHeartbeat(request, 0)
err = c.HandleRegionHeartbeat(region)
if err != nil {
// TODO: if we need to send the error back to API server.
Expand Down
Loading