From 0d3e22f87f74e5313955904b29ba1f261a0520b7 Mon Sep 17 00:00:00 2001 From: TonsnakeLin Date: Mon, 1 Apr 2024 02:25:43 -0400 Subject: [PATCH 1/5] add check action when poll the opeators from opNotifierQueue Signed-off-by: TonsnakeLin --- pkg/schedule/operator/operator_controller.go | 30 ++++++- .../operator/operator_controller_test.go | 85 +++++++++++++++++++ 2 files changed, 112 insertions(+), 3 deletions(-) diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index 07cafb9c566..e8c891d560b 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -217,10 +217,12 @@ func (oc *Controller) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) { if !ok || op == nil { return nil, true } - r = oc.cluster.GetRegion(regionID) - if r == nil { + // Check the operator lightly. It cant't dispatch the op for some scenario. + var reason CancelReasonType + r, reason = oc.checkOperatorLightly(op) + if len(reason) != 0 { _ = oc.removeOperatorLocked(op) - if op.Cancel(RegionNotFound) { + if op.Cancel(reason) { log.Warn("remove operator because region disappeared", zap.Uint64("region-id", op.RegionID()), zap.Stringer("operator", op)) @@ -301,6 +303,7 @@ func (oc *Controller) AddWaitingOperator(ops ...*Operator) int { if isMerge { // count two merge operators as one, so wopStatus.ops[desc] should // not be updated here + // TODO: call checkAddOperator ... i++ added++ oc.wop.PutOperator(ops[i]) @@ -455,6 +458,27 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) (bool return reason != Expired, reason } +// checkOperatorLightly checks whether the ops can be dispatched in Controller::pollNeedDispatchRegion. +// The operators can't be dispatched for some scenarios, such as region disapeared, region changed ... +// `region` is the target region of `op`. +func (oc *Controller) checkOperatorLightly(op *Operator) (*core.RegionInfo, CancelReasonType) { + region := oc.cluster.GetRegion(op.RegionID()) + if region == nil { + operatorCounter.WithLabelValues(op.Desc(), "not-found").Inc() + return nil, RegionNotFound + } + + // It may be suitable for all kinds of operator but not merge-region. + // But to be cautions, it only takes effect on merge-region currently. + // If the version of epoch is changed, the region has been splitted or merged, and the key range has been changed. + // The changing for conf_version of epoch doesn't modify the region key range, skipt it. + if (op.Kind()&OpMerge != 0) && region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() { + operatorCounter.WithLabelValues(op.Desc(), "epoch-not-match").Inc() + return nil, EpochNotMatch + } + return region, "" +} + func isHigherPriorityOperator(new, old *Operator) bool { return new.GetPriorityLevel() > old.GetPriorityLevel() } diff --git a/pkg/schedule/operator/operator_controller_test.go b/pkg/schedule/operator/operator_controller_test.go index 6369dea897c..a57aaba13b1 100644 --- a/pkg/schedule/operator/operator_controller_test.go +++ b/pkg/schedule/operator/operator_controller_test.go @@ -407,6 +407,91 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegion() { re.False(next) } +// issue #7992 +func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() { + re := suite.Require() + opts := mockconfig.NewTestOptions() + cluster := mockcluster.NewCluster(suite.ctx, opts) + stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) + controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), stream) + cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + + source := newRegionInfo(101, "1a", "1b", 1, 1, []uint64{101, 1}, []uint64{101, 1}) + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{} + cluster.PutRegion(source) + target := newRegionInfo(102, "1b", "1c", 1, 1, []uint64{101, 1}, []uint64{101, 1}) + target.GetMeta().RegionEpoch = &metapb.RegionEpoch{} + cluster.PutRegion(target) + + ops, err := CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge) + re.NoError(err) + re.Len(ops, 2) + re.Equal(2, controller.AddWaitingOperator(ops...)) + // Change next push time to now, it's used to make test case faster. + controller.opNotifierQueue[0].time = time.Now() + + // first poll gets source region op. + r, next := controller.pollNeedDispatchRegion() + re.True(next) + re.Equal(r, source) + + // second poll gets target region op. + controller.opNotifierQueue[0].time = time.Now() + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Equal(r, target) + + // third poll removes the two merge-region ops. + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1} + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Nil(r) + re.Equal(len(controller.opNotifierQueue), 1) + re.Equal(len(controller.operators), 0) + re.Equal(len(controller.wop.ListOperator()), 0) + re.NotNil(controller.records.Get(101)) + re.NotNil(controller.records.Get(102)) + + // fourth poll removes target region op from opNotifierQueue + controller.opNotifierQueue[0].time = time.Now() + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Nil(r) + re.Equal(len(controller.opNotifierQueue), 0) + + // Add the two ops to waiting operators again. + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 0} + controller.records.ttl.Remove(101) + controller.records.ttl.Remove(102) + ops, err = CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge) + re.Nil(err) + re.Equal(2, controller.AddWaitingOperator(ops...)) + // change the target RegionEpoch + // first poll gets source region from opNotifierQueue + target.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1} + controller.opNotifierQueue[0].time = time.Now() + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Equal(r, source) + + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Nil(r) + re.Equal(len(controller.opNotifierQueue), 1) + re.Equal(len(controller.operators), 0) + re.Equal(len(controller.wop.ListOperator()), 0) + re.NotNil(controller.records.Get(101)) + re.NotNil(controller.records.Get(102)) + + controller.opNotifierQueue[0].time = time.Now() + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Nil(r) + re.Equal(len(controller.opNotifierQueue), 0) +} + func (suite *operatorControllerTestSuite) TestStoreLimit() { re := suite.Require() opt := mockconfig.NewTestOptions() From 4c660768e013fa952626bf7e580529c7e11db99d Mon Sep 17 00:00:00 2001 From: TonsnakeLin Date: Mon, 1 Apr 2024 03:03:09 -0400 Subject: [PATCH 2/5] update Signed-off-by: TonsnakeLin --- pkg/schedule/operator/operator_controller.go | 2 +- .../operator/operator_controller_test.go | 22 +++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index e8c891d560b..e4116b8e02d 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -459,7 +459,7 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) (bool } // checkOperatorLightly checks whether the ops can be dispatched in Controller::pollNeedDispatchRegion. -// The operators can't be dispatched for some scenarios, such as region disapeared, region changed ... +// The operators can't be dispatched for some scenarios, such as region disappeared, region changed ... // `region` is the target region of `op`. func (oc *Controller) checkOperatorLightly(op *Operator) (*core.RegionInfo, CancelReasonType) { region := oc.cluster.GetRegion(op.RegionID()) diff --git a/pkg/schedule/operator/operator_controller_test.go b/pkg/schedule/operator/operator_controller_test.go index a57aaba13b1..4cf53cdb205 100644 --- a/pkg/schedule/operator/operator_controller_test.go +++ b/pkg/schedule/operator/operator_controller_test.go @@ -418,10 +418,10 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) - source := newRegionInfo(101, "1a", "1b", 1, 1, []uint64{101, 1}, []uint64{101, 1}) + source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1}) source.GetMeta().RegionEpoch = &metapb.RegionEpoch{} cluster.PutRegion(source) - target := newRegionInfo(102, "1b", "1c", 1, 1, []uint64{101, 1}, []uint64{101, 1}) + target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1}) target.GetMeta().RegionEpoch = &metapb.RegionEpoch{} cluster.PutRegion(target) @@ -448,9 +448,9 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() r, next = controller.pollNeedDispatchRegion() re.True(next) re.Nil(r) - re.Equal(len(controller.opNotifierQueue), 1) - re.Equal(len(controller.operators), 0) - re.Equal(len(controller.wop.ListOperator()), 0) + re.Len(controller.opNotifierQueue, 1) + re.Empty(controller.operators) + re.Empty(controller.wop.ListOperator()) re.NotNil(controller.records.Get(101)) re.NotNil(controller.records.Get(102)) @@ -459,14 +459,14 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() r, next = controller.pollNeedDispatchRegion() re.True(next) re.Nil(r) - re.Equal(len(controller.opNotifierQueue), 0) + re.Empty(controller.opNotifierQueue) // Add the two ops to waiting operators again. source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 0} controller.records.ttl.Remove(101) controller.records.ttl.Remove(102) ops, err = CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge) - re.Nil(err) + re.NoError(err) re.Equal(2, controller.AddWaitingOperator(ops...)) // change the target RegionEpoch // first poll gets source region from opNotifierQueue @@ -479,9 +479,9 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() r, next = controller.pollNeedDispatchRegion() re.True(next) re.Nil(r) - re.Equal(len(controller.opNotifierQueue), 1) - re.Equal(len(controller.operators), 0) - re.Equal(len(controller.wop.ListOperator()), 0) + re.Len(controller.opNotifierQueue, 1) + re.Empty(controller.operators) + re.Empty(controller.wop.ListOperator()) re.NotNil(controller.records.Get(101)) re.NotNil(controller.records.Get(102)) @@ -489,7 +489,7 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() r, next = controller.pollNeedDispatchRegion() re.True(next) re.Nil(r) - re.Equal(len(controller.opNotifierQueue), 0) + re.Empty(controller.opNotifierQueue) } func (suite *operatorControllerTestSuite) TestStoreLimit() { From 86698bdd86d3c81d8181c6abdcb52309eb7c2dc2 Mon Sep 17 00:00:00 2001 From: TonsnakeLin Date: Mon, 1 Apr 2024 09:17:36 -0400 Subject: [PATCH 3/5] update Signed-off-by: TonsnakeLin --- pkg/schedule/operator/operator_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index e4116b8e02d..a246d3b1005 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -471,7 +471,7 @@ func (oc *Controller) checkOperatorLightly(op *Operator) (*core.RegionInfo, Canc // It may be suitable for all kinds of operator but not merge-region. // But to be cautions, it only takes effect on merge-region currently. // If the version of epoch is changed, the region has been splitted or merged, and the key range has been changed. - // The changing for conf_version of epoch doesn't modify the region key range, skipt it. + // The changing for conf_version of epoch doesn't modify the region key range, skip it. if (op.Kind()&OpMerge != 0) && region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() { operatorCounter.WithLabelValues(op.Desc(), "epoch-not-match").Inc() return nil, EpochNotMatch From bd153b25b0536f961a629679beeac0cda62a779a Mon Sep 17 00:00:00 2001 From: TonsnakeLin Date: Mon, 1 Apr 2024 21:57:15 -0400 Subject: [PATCH 4/5] add test case Signed-off-by: TonsnakeLin --- .../operator/operator_controller_test.go | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/pkg/schedule/operator/operator_controller_test.go b/pkg/schedule/operator/operator_controller_test.go index 4cf53cdb205..643dbda9d73 100644 --- a/pkg/schedule/operator/operator_controller_test.go +++ b/pkg/schedule/operator/operator_controller_test.go @@ -492,6 +492,46 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() re.Empty(controller.opNotifierQueue) } +func (suite *operatorControllerTestSuite) TestCheckOperatorLightly() { + re := suite.Require() + opts := mockconfig.NewTestOptions() + cluster := mockcluster.NewCluster(suite.ctx, opts) + stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) + controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), stream) + cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + + source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1}) + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{} + cluster.PutRegion(source) + target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1}) + target.GetMeta().RegionEpoch = &metapb.RegionEpoch{} + cluster.PutRegion(target) + + ops, err := CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge) + re.NoError(err) + re.Len(ops, 2) + + // check successfully + r, reason := controller.checkOperatorLightly(ops[0]) + re.Empty(reason) + re.Equal(r, source) + + // check failed because of region disappeared + cluster.RemoveRegion(target) + r, reason = controller.checkOperatorLightly(ops[1]) + re.Nil(r) + re.Equal(reason, RegionNotFound) + + // check failed because of verions of region epoch changed + cluster.PutRegion(target) + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1} + r, reason = controller.checkOperatorLightly(ops[0]) + re.Nil(r) + re.Equal(reason, EpochNotMatch) +} + func (suite *operatorControllerTestSuite) TestStoreLimit() { re := suite.Require() opt := mockconfig.NewTestOptions() From 00cfdafc4bda89e38e497b24364225df0fee80f1 Mon Sep 17 00:00:00 2001 From: TonsnakeLin Date: Mon, 1 Apr 2024 22:42:01 -0400 Subject: [PATCH 5/5] change the cmp method from ne to gt. Signed-off-by: TonsnakeLin --- pkg/schedule/operator/operator_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index a246d3b1005..f5e86f812c9 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -472,7 +472,7 @@ func (oc *Controller) checkOperatorLightly(op *Operator) (*core.RegionInfo, Canc // But to be cautions, it only takes effect on merge-region currently. // If the version of epoch is changed, the region has been splitted or merged, and the key range has been changed. // The changing for conf_version of epoch doesn't modify the region key range, skip it. - if (op.Kind()&OpMerge != 0) && region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() { + if (op.Kind()&OpMerge != 0) && region.GetRegionEpoch().GetVersion() > op.RegionEpoch().GetVersion() { operatorCounter.WithLabelValues(op.Desc(), "epoch-not-match").Inc() return nil, EpochNotMatch }