Skip to content

Commit

Permalink
scatter: fix the unexpected error code (#7075)
Browse files Browse the repository at this point in the history
close #7073

scatter: fix the unexpected error code

Signed-off-by: nolouch <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
nolouch and ti-chi-bot[bot] committed Sep 13, 2023
1 parent d0cebac commit 11b752e
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 26 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ error = '''
failed to get the source store
'''

["PD:common:ErrGetTargetStore"]
error = '''
failed to get the target store
'''

["PD:common:ErrIncorrectSystemTime"]
error = '''
incorrect system time
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
// common error in multiple packages
var (
ErrGetSourceStore = errors.Normalize("failed to get the source store", errors.RFCCodeText("PD:common:ErrGetSourceStore"))
ErrGetTargetStore = errors.Normalize("failed to get the target store", errors.RFCCodeText("PD:common:ErrGetTargetStore"))
ErrIncorrectSystemTime = errors.Normalize("incorrect system time", errors.RFCCodeText("PD:common:ErrIncorrectSystemTime"))
)

Expand Down
14 changes: 7 additions & 7 deletions pkg/schedule/scatter/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,10 @@ func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string, skipSto
return nil, errors.Errorf("region %d is hot", region.GetID())
}

return r.scatterRegion(region, group, skipStoreLimit), nil
return r.scatterRegion(region, group, skipStoreLimit)
}

func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, skipStoreLimit bool) *operator.Operator {
func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, skipStoreLimit bool) (*operator.Operator, error) {
engineFilter := filter.NewEngineFilter(r.name, filter.NotSpecialEngines)
ordinaryPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers()))
specialPeers := make(map[string]map[uint64]*metapb.Peer)
Expand All @@ -296,7 +296,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s
for _, peer := range region.GetPeers() {
store := r.cluster.GetStore(peer.GetStoreId())
if store == nil {
return nil
return nil, errs.ErrGetSourceStore.FastGenByArgs(fmt.Sprintf("store not found, peer: %v, region id: %d", peer, region.GetID()))
}
if engineFilter.Target(r.cluster.GetSharedConfig(), store).IsOK() {
ordinaryPeers[peer.GetStoreId()] = peer
Expand Down Expand Up @@ -358,7 +358,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s
targetLeader, leaderStorePickedCount := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine)
if targetLeader == 0 {
scatterSkipNoLeaderCounter.Inc()
return nil
return nil, errs.ErrGetTargetStore.FastGenByArgs(fmt.Sprintf("no target leader store found, region: %v", region))
}

for engine, peers := range specialPeers {
Expand All @@ -375,7 +375,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s
if isSameDistribution(region, targetPeers, targetLeader) {
scatterUnnecessaryCounter.Inc()
r.Put(targetPeers, targetLeader, group)
return nil
return nil, nil
}
op, err := operator.CreateScatterRegionOperator("scatter-region", r.cluster, region, targetPeers, targetLeader, skipStoreLimit)
if err != nil {
Expand All @@ -385,7 +385,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s
}
r.Put(targetPeers, region.GetLeader().GetStoreId(), group)
log.Debug("fail to create scatter region operator", errs.ZapError(err))
return nil
return nil, errs.ErrCreateOperator.FastGenByArgs(fmt.Sprintf("failed to create scatter region operator for region %v", region.GetID()))
}
if op != nil {
scatterSuccessCounter.Inc()
Expand All @@ -394,7 +394,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s
op.AdditionalInfos["leader-picked-count"] = strconv.FormatUint(leaderStorePickedCount, 10)
op.SetPriorityLevel(constant.High)
}
return op
return op, nil
}

func allowLeader(fit *placement.RegionFit, peer *metapb.Peer) bool {
Expand Down
26 changes: 19 additions & 7 deletions pkg/schedule/scatter/region_scatterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,18 @@ func scatter(re *require.Assertions, numStores, numRegions uint64, useRules bool
tc.AddLeaderRegion(i, 1, 2, 3)
}
scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions)

noNeedMoveNum := 0
for i := uint64(1); i <= numRegions; i++ {
region := tc.GetRegion(i)
if op, _ := scatterer.Scatter(region, "", false); op != nil {
if op, err := scatterer.Scatter(region, "", false); err == nil {
if op == nil {
noNeedMoveNum++
continue
}
checkOperator(re, op)
operator.ApplyOperator(tc, op)
} else {
re.Nil(op)
}
}

Expand Down Expand Up @@ -140,6 +146,7 @@ func scatter(re *require.Assertions, numStores, numRegions uint64, useRules bool
re.LessOrEqual(float64(count), 1.1*float64(numRegions)/float64(numStores))
re.GreaterOrEqual(float64(count), 0.9*float64(numRegions)/float64(numStores))
}
re.GreaterOrEqual(noNeedMoveNum, 0)
}

func scatterSpecial(re *require.Assertions, numOrdinaryStores, numSpecialStores, numRegions uint64) {
Expand Down Expand Up @@ -651,7 +658,8 @@ func TestSelectedStoresTooFewPeers(t *testing.T) {
// Try to scatter a region with peer store id 2/3/4
for i := uint64(1); i < 20; i++ {
region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2)
op := scatterer.scatterRegion(region, group, false)
op, err := scatterer.scatterRegion(region, group, false)
re.NoError(err)
re.False(isPeerCountChanged(op))
if op != nil {
re.Equal(group, op.AdditionalInfos["group"])
Expand Down Expand Up @@ -691,7 +699,8 @@ func TestSelectedStoresTooManyPeers(t *testing.T) {
// test region with peer 1 2 3
for i := uint64(1); i < 20; i++ {
region := tc.AddLeaderRegion(i+200, i%3+1, (i+1)%3+1, (i+2)%3+1)
op := scatterer.scatterRegion(region, group, false)
op, err := scatterer.scatterRegion(region, group, false)
re.NoError(err)
re.False(isPeerCountChanged(op))
}
}
Expand All @@ -715,7 +724,8 @@ func TestBalanceLeader(t *testing.T) {
scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions)
for i := uint64(1001); i <= 1300; i++ {
region := tc.AddLeaderRegion(i, 2, 3, 4)
op := scatterer.scatterRegion(region, group, false)
op, err := scatterer.scatterRegion(region, group, false)
re.NoError(err)
re.False(isPeerCountChanged(op))
}
// all leader will be balanced in three stores.
Expand Down Expand Up @@ -745,7 +755,8 @@ func TestBalanceRegion(t *testing.T) {
scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions)
for i := uint64(1001); i <= 1300; i++ {
region := tc.AddLeaderRegion(i, 2, 4, 6)
op := scatterer.scatterRegion(region, group, false)
op, err := scatterer.scatterRegion(region, group, false)
re.NoError(err)
re.False(isPeerCountChanged(op))
}
for i := uint64(2); i <= 7; i++ {
Expand All @@ -754,7 +765,8 @@ func TestBalanceRegion(t *testing.T) {
// Test for unhealthy region
// ref https://github.com/tikv/pd/issues/6099
region := tc.AddLeaderRegion(1500, 2, 3, 4, 6)
op := scatterer.scatterRegion(region, group, false)
op, err := scatterer.scatterRegion(region, group, false)
re.NoError(err)
re.False(isPeerCountChanged(op))
}

Expand Down
19 changes: 7 additions & 12 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1709,18 +1709,13 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg
return nil, err
}

if op == nil {
return &pdpb.ScatterRegionResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN,
"operator could not be allocated"),
}, nil
}

if !rc.GetOperatorController().AddOperator(op) {
return &pdpb.ScatterRegionResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN,
"operator cancelled because store limit exceeded"),
}, nil
if op != nil {
if !rc.GetOperatorController().AddOperator(op) {
return &pdpb.ScatterRegionResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN,
"operator canceled because cannot add an operator to the execute queue"),
}, nil
}
}

return &pdpb.ScatterRegionResponse{
Expand Down

0 comments on commit 11b752e

Please sign in to comment.