Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: add check action when poll the opeators from opNotifierQueue #8010

Merged
merged 6 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,12 @@ func (oc *Controller) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) {
if !ok || op == nil {
return nil, true
}
r = oc.cluster.GetRegion(regionID)
if r == nil {
// Check the operator lightly. It cant't dispatch the op for some scenario.
var reason CancelReasonType
r, reason = oc.checkOperatorLightly(op)
if len(reason) != 0 {
_ = oc.removeOperatorLocked(op)
if op.Cancel(RegionNotFound) {
if op.Cancel(reason) {
log.Warn("remove operator because region disappeared",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
Expand Down Expand Up @@ -301,6 +303,7 @@ func (oc *Controller) AddWaitingOperator(ops ...*Operator) int {
if isMerge {
// count two merge operators as one, so wopStatus.ops[desc] should
// not be updated here
// TODO: call checkAddOperator ...
i++
added++
oc.wop.PutOperator(ops[i])
Expand Down Expand Up @@ -455,6 +458,27 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) (bool
return reason != Expired, reason
}

// checkOperatorLightly checks whether the ops can be dispatched in Controller::pollNeedDispatchRegion.
// The operators can't be dispatched for some scenarios, such as region disappeared, region changed ...
// `region` is the target region of `op`.
func (oc *Controller) checkOperatorLightly(op *Operator) (*core.RegionInfo, CancelReasonType) {
region := oc.cluster.GetRegion(op.RegionID())
if region == nil {
operatorCounter.WithLabelValues(op.Desc(), "not-found").Inc()
return nil, RegionNotFound
}

// It may be suitable for all kinds of operator but not merge-region.
// But to be cautions, it only takes effect on merge-region currently.
// If the version of epoch is changed, the region has been splitted or merged, and the key range has been changed.
// The changing for conf_version of epoch doesn't modify the region key range, skipt it.
TonsnakeLin marked this conversation as resolved.
Show resolved Hide resolved
if (op.Kind()&OpMerge != 0) && region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to compare the epoch like checkStaleOperator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkStaleOperator only check the ConfVersion of region epoch which maybe not resolve the issue #7992. For example, "operator add split-region 258 --policy="approximate" will change the version of region 258, but not for ConfVersion.
Another reason is I don't know why limits the checkStaleOperator only to DispatchFromHeartBeat source https://github.com/tikv/pd/blob/master/pkg/schedule/operator/operator_controller.go#L120

Copy link
Member

@rleungx rleungx Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, is it enough to only check the version? Or we also need to check both the version and conf version?

Copy link
Contributor Author

@TonsnakeLin TonsnakeLin Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At begining, I checked both the version and conf-version. But I think the conf-version changed doesn't mean the key-range changed, the merge operator can still be executed sucessfully. At last I check only the version of RegionEpoch.
um... I'm a little not sure whether to consider conf-version.
Could you give me some suggestion ? @disksing @rleungx , Thank you!!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I think there will be a problem if we use != since the operator's epoch won't change after creating.

operatorCounter.WithLabelValues(op.Desc(), "epoch-not-match").Inc()
return nil, EpochNotMatch
}
return region, ""
}

func isHigherPriorityOperator(new, old *Operator) bool {
return new.GetPriorityLevel() > old.GetPriorityLevel()
}
Expand Down
85 changes: 85 additions & 0 deletions pkg/schedule/operator/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,91 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegion() {
re.False(next)
}

// issue #7992
func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() {
re := suite.Require()
opts := mockconfig.NewTestOptions()
cluster := mockcluster.NewCluster(suite.ctx, opts)
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */)
controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), stream)
cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})

source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1})
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(source)
target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1})
target.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(target)

ops, err := CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge)
re.NoError(err)
re.Len(ops, 2)
re.Equal(2, controller.AddWaitingOperator(ops...))
// Change next push time to now, it's used to make test case faster.
controller.opNotifierQueue[0].time = time.Now()

// first poll gets source region op.
r, next := controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, source)

// second poll gets target region op.
controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, target)

// third poll removes the two merge-region ops.
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1}
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Len(controller.opNotifierQueue, 1)
re.Empty(controller.operators)
re.Empty(controller.wop.ListOperator())
re.NotNil(controller.records.Get(101))
re.NotNil(controller.records.Get(102))

// fourth poll removes target region op from opNotifierQueue
controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Empty(controller.opNotifierQueue)

// Add the two ops to waiting operators again.
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 0}
controller.records.ttl.Remove(101)
controller.records.ttl.Remove(102)
ops, err = CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge)
re.NoError(err)
re.Equal(2, controller.AddWaitingOperator(ops...))
// change the target RegionEpoch
// first poll gets source region from opNotifierQueue
target.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1}
controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, source)

r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Len(controller.opNotifierQueue, 1)
re.Empty(controller.operators)
re.Empty(controller.wop.ListOperator())
re.NotNil(controller.records.Get(101))
re.NotNil(controller.records.Get(102))

controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Empty(controller.opNotifierQueue)
}

func (suite *operatorControllerTestSuite) TestStoreLimit() {
re := suite.Require()
opt := mockconfig.NewTestOptions()
Expand Down
Loading