Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: add check action when poll the opeators from opNotifierQueue #8010

Merged
merged 6 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,12 @@ func (oc *Controller) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) {
if !ok || op == nil {
return nil, true
}
r = oc.cluster.GetRegion(regionID)
if r == nil {
// Check the operator lightly. It cant't dispatch the op for some scenario.
var reason CancelReasonType
r, reason = oc.checkOperatorLightly(op)
if len(reason) != 0 {
_ = oc.removeOperatorLocked(op)
if op.Cancel(RegionNotFound) {
if op.Cancel(reason) {
log.Warn("remove operator because region disappeared",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
Expand Down Expand Up @@ -301,6 +303,7 @@ func (oc *Controller) AddWaitingOperator(ops ...*Operator) int {
if isMerge {
// count two merge operators as one, so wopStatus.ops[desc] should
// not be updated here
// TODO: call checkAddOperator ...
i++
added++
oc.wop.PutOperator(ops[i])
Expand Down Expand Up @@ -455,6 +458,27 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) (bool
return reason != Expired, reason
}

// checkOperatorLightly checks whether the ops can be dispatched in Controller::pollNeedDispatchRegion.
// The operators can't be dispatched for some scenarios, such as region disappeared, region changed ...
// `region` is the target region of `op`.
func (oc *Controller) checkOperatorLightly(op *Operator) (*core.RegionInfo, CancelReasonType) {
region := oc.cluster.GetRegion(op.RegionID())
if region == nil {
operatorCounter.WithLabelValues(op.Desc(), "not-found").Inc()
return nil, RegionNotFound
}

// It may be suitable for all kinds of operator but not merge-region.
// But to be cautions, it only takes effect on merge-region currently.
// If the version of epoch is changed, the region has been splitted or merged, and the key range has been changed.
// The changing for conf_version of epoch doesn't modify the region key range, skip it.
if (op.Kind()&OpMerge != 0) && region.GetRegionEpoch().GetVersion() > op.RegionEpoch().GetVersion() {
operatorCounter.WithLabelValues(op.Desc(), "epoch-not-match").Inc()
return nil, EpochNotMatch
}
return region, ""
}

func isHigherPriorityOperator(new, old *Operator) bool {
return new.GetPriorityLevel() > old.GetPriorityLevel()
}
Expand Down
125 changes: 125 additions & 0 deletions pkg/schedule/operator/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,131 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegion() {
re.False(next)
}

// issue #7992
func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() {
re := suite.Require()
opts := mockconfig.NewTestOptions()
cluster := mockcluster.NewCluster(suite.ctx, opts)
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */)
controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), stream)
cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})

source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1})
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(source)
target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1})
target.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(target)

ops, err := CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge)
re.NoError(err)
re.Len(ops, 2)
re.Equal(2, controller.AddWaitingOperator(ops...))
// Change next push time to now, it's used to make test case faster.
controller.opNotifierQueue[0].time = time.Now()

// first poll gets source region op.
r, next := controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, source)

// second poll gets target region op.
controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, target)

// third poll removes the two merge-region ops.
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1}
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Len(controller.opNotifierQueue, 1)
re.Empty(controller.operators)
re.Empty(controller.wop.ListOperator())
re.NotNil(controller.records.Get(101))
re.NotNil(controller.records.Get(102))

// fourth poll removes target region op from opNotifierQueue
controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Empty(controller.opNotifierQueue)

// Add the two ops to waiting operators again.
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 0}
controller.records.ttl.Remove(101)
controller.records.ttl.Remove(102)
ops, err = CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge)
re.NoError(err)
re.Equal(2, controller.AddWaitingOperator(ops...))
// change the target RegionEpoch
// first poll gets source region from opNotifierQueue
target.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1}
controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, source)

r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Len(controller.opNotifierQueue, 1)
re.Empty(controller.operators)
re.Empty(controller.wop.ListOperator())
re.NotNil(controller.records.Get(101))
re.NotNil(controller.records.Get(102))

controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Empty(controller.opNotifierQueue)
}

func (suite *operatorControllerTestSuite) TestCheckOperatorLightly() {
re := suite.Require()
opts := mockconfig.NewTestOptions()
cluster := mockcluster.NewCluster(suite.ctx, opts)
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */)
controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), stream)
cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})

source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1})
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(source)
target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1})
target.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(target)

ops, err := CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge)
re.NoError(err)
re.Len(ops, 2)

// check successfully
r, reason := controller.checkOperatorLightly(ops[0])
re.Empty(reason)
re.Equal(r, source)

// check failed because of region disappeared
cluster.RemoveRegion(target)
r, reason = controller.checkOperatorLightly(ops[1])
re.Nil(r)
re.Equal(reason, RegionNotFound)

// check failed because of verions of region epoch changed
cluster.PutRegion(target)
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1}
r, reason = controller.checkOperatorLightly(ops[0])
re.Nil(r)
re.Equal(reason, EpochNotMatch)
}

func (suite *operatorControllerTestSuite) TestStoreLimit() {
re := suite.Require()
opt := mockconfig.NewTestOptions()
Expand Down
Loading