diff --git a/pkg/schedule/operator/operator.go b/pkg/schedule/operator/operator.go index de197c4fba4..4d57d4fc6c7 100644 --- a/pkg/schedule/operator/operator.go +++ b/pkg/schedule/operator/operator.go @@ -376,10 +376,11 @@ func (o *Operator) Check(region *core.RegionInfo) OpStep { defer func() { _ = o.CheckTimeout() }() for step := atomic.LoadInt32(&o.currentStep); int(step) < len(o.steps); step++ { if o.steps[int(step)].IsFinish(region) { - if atomic.CompareAndSwapInt64(&(o.stepsTime[step]), 0, time.Now().UnixNano()) { + current := time.Now() + if atomic.CompareAndSwapInt64(&(o.stepsTime[step]), 0, current.UnixNano()) { startTime, _ := o.getCurrentTimeAndStep() operatorStepDuration.WithLabelValues(reflect.TypeOf(o.steps[int(step)]).Name()). - Observe(time.Unix(0, o.stepsTime[step]).Sub(startTime).Seconds()) + Observe(current.Sub(startTime).Seconds()) } atomic.StoreInt32(&o.currentStep, step+1) } else { diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index a13da7fd349..ad45ea47ded 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -461,7 +461,7 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) (bool return false, NotInCreateStatus } if !isPromoting && oc.wopStatus.getCount(op.Desc()) >= oc.config.GetSchedulerMaxWaitingOperator() { - log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.ops[op.Desc()]), zap.String("desc", op.Desc()), zap.Uint64("max", oc.config.GetSchedulerMaxWaitingOperator())) + log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.getCount(op.Desc())), zap.String("desc", op.Desc()), zap.Uint64("max", oc.config.GetSchedulerMaxWaitingOperator())) operatorCounter.WithLabelValues(op.Desc(), "exceed-max-waiting").Inc() return false, ExceedWaitLimit } diff --git a/pkg/schedule/operator/operator_test.go b/pkg/schedule/operator/operator_test.go index 4719df9408b..c219a3bf5c3 100644 --- a/pkg/schedule/operator/operator_test.go +++ b/pkg/schedule/operator/operator_test.go @@ -17,6 +17,7 @@ package operator import ( "context" "encoding/json" + "sync" "sync/atomic" "testing" "time" @@ -574,3 +575,26 @@ func (suite *operatorTestSuite) TestToJSONObject() { obj = op.ToJSONObject() suite.Equal(TIMEOUT, obj.Status) } + +func (suite *operatorTestSuite) TestOperatorCheckConcurrently() { + region := suite.newTestRegion(1, 1, [2]uint64{1, 1}, [2]uint64{2, 2}) + // addPeer1, transferLeader1, removePeer3 + steps := []OpStep{ + AddPeer{ToStore: 1, PeerID: 1}, + TransferLeader{FromStore: 3, ToStore: 1}, + RemovePeer{FromStore: 3}, + } + op := NewTestOperator(1, &metapb.RegionEpoch{}, OpAdmin|OpLeader|OpRegion, steps...) + suite.Equal(constant.Urgent, op.GetPriorityLevel()) + suite.checkSteps(suite.Require(), op, steps) + op.Start() + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + suite.Nil(op.Check(region)) + }() + } + wg.Wait() +}