Skip to content

Commit

Permalink
schedule: add check action when poll the opeators from opNotifierQueue (
Browse files Browse the repository at this point in the history
#8010) (#8179)

close #7992

Signed-off-by: TonsnakeLin <[email protected]>

Co-authored-by: TonsnakeLin <[email protected]>
  • Loading branch information
ti-chi-bot and TonsnakeLin authored May 20, 2024
1 parent 7eb188c commit 5a3d21d
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 3 deletions.
30 changes: 27 additions & 3 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,12 @@ func (oc *Controller) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) {
if !ok || op == nil {
return nil, true
}
r = oc.cluster.GetRegion(regionID)
if r == nil {
// Check the operator lightly. It cant't dispatch the op for some scenario.
var reason CancelReasonType
r, reason = oc.checkOperatorLightly(op)
if len(reason) != 0 {
_ = oc.removeOperatorLocked(op)
if op.Cancel(RegionNotFound) {
if op.Cancel(reason) {
log.Warn("remove operator because region disappeared",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
Expand Down Expand Up @@ -301,6 +303,7 @@ func (oc *Controller) AddWaitingOperator(ops ...*Operator) int {
if isMerge {
// count two merge operators as one, so wopStatus.ops[desc] should
// not be updated here
// TODO: call checkAddOperator ...
i++
added++
oc.wop.PutOperator(ops[i])
Expand Down Expand Up @@ -455,6 +458,27 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) (bool
return reason != Expired, reason
}

// checkOperatorLightly checks whether the ops can be dispatched in Controller::pollNeedDispatchRegion.
// The operators can't be dispatched for some scenarios, such as region disappeared, region changed ...
// `region` is the target region of `op`.
func (oc *Controller) checkOperatorLightly(op *Operator) (*core.RegionInfo, CancelReasonType) {
region := oc.cluster.GetRegion(op.RegionID())
if region == nil {
operatorCounter.WithLabelValues(op.Desc(), "not-found").Inc()
return nil, RegionNotFound
}

// It may be suitable for all kinds of operator but not merge-region.
// But to be cautions, it only takes effect on merge-region currently.
// If the version of epoch is changed, the region has been splitted or merged, and the key range has been changed.
// The changing for conf_version of epoch doesn't modify the region key range, skip it.
if (op.Kind()&OpMerge != 0) && region.GetRegionEpoch().GetVersion() > op.RegionEpoch().GetVersion() {
operatorCounter.WithLabelValues(op.Desc(), "epoch-not-match").Inc()
return nil, EpochNotMatch
}
return region, ""
}

func isHigherPriorityOperator(new, old *Operator) bool {
return new.GetPriorityLevel() > old.GetPriorityLevel()
}
Expand Down
125 changes: 125 additions & 0 deletions pkg/schedule/operator/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,131 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegion() {
suite.False(next)
}

// issue #7992
func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() {
re := suite.Require()
opts := mockconfig.NewTestOptions()
cluster := mockcluster.NewCluster(suite.ctx, opts)
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */)
controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), stream)
cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})

source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1})
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(source)
target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1})
target.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(target)

ops, err := CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge)
re.NoError(err)
re.Len(ops, 2)
re.Equal(2, controller.AddWaitingOperator(ops...))
// Change next push time to now, it's used to make test case faster.
controller.opNotifierQueue[0].time = time.Now()

// first poll gets source region op.
r, next := controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, source)

// second poll gets target region op.
controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, target)

// third poll removes the two merge-region ops.
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1}
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Len(controller.opNotifierQueue, 1)
re.Empty(controller.operators)
re.Empty(controller.wop.ListOperator())
re.NotNil(controller.records.Get(101))
re.NotNil(controller.records.Get(102))

// fourth poll removes target region op from opNotifierQueue
controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Empty(controller.opNotifierQueue)

// Add the two ops to waiting operators again.
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 0}
controller.records.ttl.Remove(101)
controller.records.ttl.Remove(102)
ops, err = CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge)
re.NoError(err)
re.Equal(2, controller.AddWaitingOperator(ops...))
// change the target RegionEpoch
// first poll gets source region from opNotifierQueue
target.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1}
controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, source)

r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Len(controller.opNotifierQueue, 1)
re.Empty(controller.operators)
re.Empty(controller.wop.ListOperator())
re.NotNil(controller.records.Get(101))
re.NotNil(controller.records.Get(102))

controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Empty(controller.opNotifierQueue)
}

func (suite *operatorControllerTestSuite) TestCheckOperatorLightly() {
re := suite.Require()
opts := mockconfig.NewTestOptions()
cluster := mockcluster.NewCluster(suite.ctx, opts)
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */)
controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), stream)
cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})

source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1})
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(source)
target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1})
target.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(target)

ops, err := CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge)
re.NoError(err)
re.Len(ops, 2)

// check successfully
r, reason := controller.checkOperatorLightly(ops[0])
re.Empty(reason)
re.Equal(r, source)

// check failed because of region disappeared
cluster.RemoveRegion(target)
r, reason = controller.checkOperatorLightly(ops[1])
re.Nil(r)
re.Equal(reason, RegionNotFound)

// check failed because of verions of region epoch changed
cluster.PutRegion(target)
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1}
r, reason = controller.checkOperatorLightly(ops[0])
re.Nil(r)
re.Equal(reason, EpochNotMatch)
}

func (suite *operatorControllerTestSuite) TestStoreLimit() {
opt := mockconfig.NewTestOptions()
tc := mockcluster.NewCluster(suite.ctx, opt)
Expand Down

0 comments on commit 5a3d21d

Please sign in to comment.