Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix region stats check (#7748) #7976

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (suite *regionTestSuite) TestRegionCheck() {
r7 := make([]*histItem, 1)
suite.NoError(tu.ReadGetJSON(re, testDialClient, url, &r7))
histKeys := []*histItem{{Start: 1000, End: 1999, Count: 1}}
suite.Equal(histKeys, r7)
re.Equal(histKeys, r7)
}

func (suite *regionTestSuite) TestRegions() {
Expand Down
5 changes: 2 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,9 +858,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, saveKV, saveCache, needSync := regionGuide(region, origin)
if !saveKV && !saveCache && !isNew {
saveKV, saveCache, needSync := regionGuide(region, origin)
if !saveKV && !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand Down
28 changes: 15 additions & 13 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@
return r.source == Storage
}

// LoadedFromSync means this region's meta info loaded from region syncer.
// Only used for test.
func (r *RegionInfo) LoadedFromSync() bool {
return r.source == Sync
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCreateOption) *RegionInfo {
regionInfo := &RegionInfo{
Expand Down Expand Up @@ -626,7 +632,7 @@

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool)
type RegionGuideFunc func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -639,13 +645,14 @@
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) {
return func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
if origin == nil {
debug("insert new region",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
saveKV, saveCache, isNew = true, true, true
if log.GetLevel() <= zap.DebugLevel {
debug("insert new region",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))

Check warning on line 653 in server/core/region.go

View check run for this annotation

Codecov / codecov/patch

server/core/region.go#L651-L653

Added lines #L651 - L653 were not covered by tests
}
saveKV, saveCache = true, true
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
Expand All @@ -668,9 +675,7 @@
saveKV, saveCache = true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() == 0 {
isNew = true
} else {
if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel {
info("leader changed",
zap.Uint64("region-id", region.GetID()),
zap.Uint64("from", origin.GetLeader().GetStoreId()),
Expand Down Expand Up @@ -718,9 +723,6 @@
if region.IsFlashbackChanged(origin) {
saveCache = true
}
if origin.LoadedFromStorage() {
isNew = true
}
}
return
}
Expand Down
2 changes: 1 addition & 1 deletion server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
_, _, _, needSync := RegionGuide(regionA, regionB)
_, _, needSync := RegionGuide(regionA, regionB)
re.Equal(testCase.needSync, needSync)
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
continue
}
_, saveKV, _, _ := regionGuide(region, origin)
saveKV, _, _ := regionGuide(region, origin)
overlaps := bc.PutRegion(region)

if hasBuckets {
Expand Down
25 changes: 21 additions & 4 deletions server/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
// RegionStatisticType represents the type of the region's status.
type RegionStatisticType uint32

const emptyStatistic = RegionStatisticType(0)

// region status type
const (
MissPeer RegionStatisticType = 1 << iota
Expand Down Expand Up @@ -140,6 +142,9 @@ func (r *RegionStatistics) deleteOfflineEntry(deleteIndex RegionStatisticType, r
// due to some special state types.
func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool {
regionID := region.GetID()
if !r.isObserved(regionID) {
return true
}
if r.IsRegionStatsType(regionID, OversizedRegion) !=
region.IsOversized(int64(r.storeConfigManager.GetStoreConfig().GetRegionMaxSize()), int64(r.storeConfigManager.GetStoreConfig().GetRegionMaxKeys())) {
return true
Expand All @@ -148,6 +153,14 @@ func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool {
region.NeedMerge(int64(r.opt.GetMaxMergeRegionSize()), int64(r.opt.GetMaxMergeRegionKeys()))
}

// isObserved returns whether the region is observed. And it also shows whether PD received heartbeat of this region.
func (r *RegionStatistics) isObserved(id uint64) bool {
r.RLock()
defer r.RUnlock()
_, ok := r.index[id]
return ok
}

// Observe records the current regions' status.
func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.StoreInfo) {
r.Lock()
Expand Down Expand Up @@ -251,10 +264,11 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store
r.deleteOfflineEntry(deleteIndex, regionID)
r.offlineIndex[regionID] = offlinePeerTypeIndex

if oldIndex, ok := r.index[regionID]; ok {
deleteIndex = oldIndex &^ peerTypeIndex
// Remove the info if any of the conditions are not met any more.
if oldIndex, ok := r.index[regionID]; ok && oldIndex > emptyStatistic {
deleteIndex := oldIndex &^ peerTypeIndex
r.deleteEntry(deleteIndex, regionID)
}
r.deleteEntry(deleteIndex, regionID)
r.index[regionID] = peerTypeIndex
}

Expand All @@ -263,7 +277,10 @@ func (r *RegionStatistics) ClearDefunctRegion(regionID uint64) {
r.Lock()
defer r.Unlock()
if oldIndex, ok := r.index[regionID]; ok {
r.deleteEntry(oldIndex, regionID)
delete(r.index, regionID)
if oldIndex > emptyStatistic {
r.deleteEntry(oldIndex, regionID)
}
}
if oldIndex, ok := r.offlineIndex[regionID]; ok {
r.deleteOfflineEntry(oldIndex, regionID)
Expand Down
94 changes: 94 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
syncer "github.com/tikv/pd/server/region_syncer"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/statistics"
"github.com/tikv/pd/server/storage"
"github.com/tikv/pd/server/tso"
"github.com/tikv/pd/tests"
Expand Down Expand Up @@ -180,6 +181,99 @@ func TestDamagedRegion(t *testing.T) {
re.Equal(uint64(1), rc.GetOperatorController().OperatorCount(operator.OpAdmin))
}

func TestRegionStatistics(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tc, err := tests.NewTestCluster(ctx, 2)
defer tc.Destroy()
re.NoError(err)

err = tc.RunInitialServers()
re.NoError(err)

leaderName := tc.WaitLeader()
leaderServer := tc.GetServer(leaderName)
grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr())
clusterID := leaderServer.GetClusterID()
bootstrapCluster(re, clusterID, grpcPDClient)
rc := leaderServer.GetRaftCluster()

region := &metapb.Region{
Id: 10,
StartKey: []byte("abc"),
EndKey: []byte("xyz"),
Peers: []*metapb.Peer{
{Id: 101, StoreId: 1},
{Id: 102, StoreId: 2},
{Id: 103, StoreId: 3},
{Id: 104, StoreId: 4, Role: metapb.PeerRole_Learner},
},
}

// To put region.
regionInfo := core.NewRegionInfo(region, region.Peers[0], core.SetApproximateSize(0))
err = tc.HandleRegionHeartbeat(regionInfo)
re.NoError(err)
regions := rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Len(regions, 1)

// wait for sync region
time.Sleep(1000 * time.Millisecond)

leaderServer.ResignLeader()
newLeaderName := tc.WaitLeader()
re.NotEqual(newLeaderName, leaderName)
leaderServer = tc.GetServer(newLeaderName)
rc = leaderServer.GetRaftCluster()
r := rc.GetRegion(region.Id)
re.NotNil(r)
re.True(r.LoadedFromSync())
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Empty(regions)
err = tc.HandleRegionHeartbeat(regionInfo)
re.NoError(err)
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Len(regions, 1)

leaderServer.ResignLeader()
newLeaderName = tc.WaitLeader()
re.Equal(newLeaderName, leaderName)
leaderServer = tc.GetServer(newLeaderName)
rc = leaderServer.GetRaftCluster()
re.NotNil(r)
re.True(r.LoadedFromStorage() || r.LoadedFromSync())
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Empty(regions)
regionInfo = regionInfo.Clone(core.SetSource(core.Heartbeat), core.SetApproximateSize(30))
err = tc.HandleRegionHeartbeat(regionInfo)
re.NoError(err)
rc = leaderServer.GetRaftCluster()
r = rc.GetRegion(region.Id)
re.NotNil(r)
re.False(r.LoadedFromStorage() && r.LoadedFromSync())

leaderServer.ResignLeader()
newLeaderName = tc.WaitLeader()
re.NotEqual(newLeaderName, leaderName)
leaderServer.ResignLeader()
newLeaderName = tc.WaitLeader()
re.Equal(newLeaderName, leaderName)
leaderServer = tc.GetServer(newLeaderName)
rc = leaderServer.GetRaftCluster()
r = rc.GetRegion(region.Id)
re.NotNil(r)
re.False(r.LoadedFromStorage() && r.LoadedFromSync())
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Empty(regions)

regionInfo = regionInfo.Clone(core.SetSource(core.Heartbeat), core.SetApproximateSize(30))
err = tc.HandleRegionHeartbeat(regionInfo)
re.NoError(err)
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Len(regions, 1)
}

func TestStaleRegion(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
Loading