From 247158f106d6ed440934ee7c4b31bc96a005eec1 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 7 Jun 2024 17:56:58 +0800 Subject: [PATCH] This is an automated cherry-pick of #8264 close tikv/pd#8263 Signed-off-by: ti-chi-bot --- pkg/schedule/operator/operator_controller.go | 5 ++++ pkg/schedule/operator/operator_test.go | 27 ++++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/pkg/schedule/operator/operator_controller.go b/pkg/schedule/operator/operator_controller.go index 23db4f88e44..0cc9da263da 100644 --- a/pkg/schedule/operator/operator_controller.go +++ b/pkg/schedule/operator/operator_controller.go @@ -438,8 +438,13 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) (bool operatorCounter.WithLabelValues(op.Desc(), "unexpected-status").Inc() return false, NotInCreateStatus } +<<<<<<< HEAD if !isPromoting && oc.wopStatus.ops[op.Desc()] >= oc.config.GetSchedulerMaxWaitingOperator() { log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.ops[op.Desc()]), zap.String("desc", op.Desc()), zap.Uint64("max", oc.config.GetSchedulerMaxWaitingOperator())) +======= + if !isPromoting && oc.wopStatus.getCount(op.Desc()) >= oc.config.GetSchedulerMaxWaitingOperator() { + log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.getCount(op.Desc())), zap.String("desc", op.Desc()), zap.Uint64("max", oc.config.GetSchedulerMaxWaitingOperator())) +>>>>>>> e767c012f (schedule: fix datarace in `operator.check` (#8264)) operatorCounter.WithLabelValues(op.Desc(), "exceed-max-waiting").Inc() return false, ExceedWaitLimit } diff --git a/pkg/schedule/operator/operator_test.go b/pkg/schedule/operator/operator_test.go index 828ab4be27d..6c0356a765e 100644 --- a/pkg/schedule/operator/operator_test.go +++ b/pkg/schedule/operator/operator_test.go @@ -17,7 +17,10 @@ package operator import ( "context" "encoding/json" +<<<<<<< HEAD "fmt" +======= +>>>>>>> e767c012f (schedule: fix datarace in `operator.check` (#8264)) "sync" "sync/atomic" "testing" @@ -553,3 +556,27 @@ func (suite *operatorTestSuite) TestOperatorCheckConcurrently() { } wg.Wait() } + +func TestOperatorCheckConcurrently(t *testing.T) { + re := require.New(t) + region := newTestRegion(1, 1, [2]uint64{1, 1}, [2]uint64{2, 2}) + // addPeer1, transferLeader1, removePeer3 + steps := []OpStep{ + AddPeer{ToStore: 1, PeerID: 1}, + TransferLeader{FromStore: 3, ToStore: 1}, + RemovePeer{FromStore: 3}, + } + op := NewTestOperator(1, &metapb.RegionEpoch{}, OpAdmin|OpLeader|OpRegion, steps...) + re.Equal(constant.Urgent, op.GetPriorityLevel()) + checkSteps(re, op, steps) + op.Start() + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + re.Nil(op.Check(region)) + }() + } + wg.Wait() +}